diff --git a/Dockerfile b/Dockerfile index e33e26572c6..738506e1f85 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,11 @@ -FROM webhippie/golang:1.14 as build +FROM webhippie/golang:1.15 as build COPY ./ /ocis/ ENV CGO_ENABLED=0 ENV GOOS=linux RUN apk update && \ - apk upgrade && \ + apk upgrade --ignore musl-dev && \ apk add make gcc bash && \ rm -rf /var/cache/apk/* diff --git a/accounts/pkg/service/v0/accounts.go b/accounts/pkg/service/v0/accounts.go index e3d42618f20..129db349eff 100644 --- a/accounts/pkg/service/v0/accounts.go +++ b/accounts/pkg/service/v0/accounts.go @@ -6,7 +6,7 @@ import ( "crypto/sha256" "encoding/hex" "fmt" - "github.com/owncloud/ocis/ocis-pkg/cache" + "github.com/owncloud/ocis/ocis-pkg/sync" "golang.org/x/crypto/bcrypt" "path" "regexp" @@ -33,7 +33,7 @@ import ( ) // passwordValidCache caches basic auth password validations -var passwordValidCache = cache.NewCache(1024) +var passwordValidCache = sync.NewCache(1024) // passwordValidCacheExpiration defines the entry lifetime const passwordValidCacheExpiration = 10 * time.Minute diff --git a/changelog/unreleased/account-locking.md b/changelog/unreleased/account-locking.md index e87e52f5679..592efc5a0dd 100644 --- a/changelog/unreleased/account-locking.md +++ b/changelog/unreleased/account-locking.md @@ -1,16 +1,16 @@ Enhancement: remove locking from accounts service -Tags: ocis +Tags: accounts, ocis-pkg -In the past we locked every request in the accounts service. This is problematic as soon as we start to hammer the system with many users at the same time. +In the past we locked every request in the accounts service. This is problematic as a larger number of concurrent requests arrives at the accounts service. The locking is now removed from the accounts service and is moved to the indexer. Instead of doing locking for reads and writes we now differentiate them by using a named RWLock. - remove locking from accounts service -- add a cached named rwlock pkg -- use sync.map in the cache pkg -- use named rwlock in indexer pkg -- use sync.map in indexer pkg +- add sync package with named mutex +- add named locking to indexer +- move cache into sync pkg +https://github.com/owncloud/ocis/pull/1212 https://github.com/owncloud/ocis/issues/966 diff --git a/ocis-pkg/cache/cache.go b/ocis-pkg/cache/cache.go deleted file mode 100644 index 8941510e405..00000000000 --- a/ocis-pkg/cache/cache.go +++ /dev/null @@ -1,84 +0,0 @@ -package cache - -import ( - "sync" - "time" -) - -// Entry represents an entry on the cache. You can type assert on V. -type Entry struct { - V interface{} - expiration time.Time -} - -// Cache is a barebones cache implementation. -type Cache struct { - sync.Map - sizeTotal int - sizeCurrent int -} - -// NewCache returns a new instance of Cache. -func NewCache(sizeTotal int) Cache { - return Cache{ - sizeTotal: sizeTotal, - } -} - -// Get gets an entry by given key -func (c *Cache) Get(k string) *Entry { - if sme, ok := c.Load(k); ok { - e := sme.(*Entry) - if c.expired(e) { - c.Delete(k) - return nil - } - return e - } - return nil -} - -// Set adds an entry for given key and value -func (c *Cache) Set(k string, val interface{}, expiration time.Time) { - if !c.fits() { - c.evict() - } - c.Store(k, &Entry{ - val, - expiration, - }) - c.sizeCurrent++ -} - -// Unset removes an entry by given key -func (c *Cache) Unset(k string) bool { - if _, ok := c.Load(k); !ok { - return false - } - - c.Delete(k) - c.sizeCurrent-- - return true -} - -// evict frees memory from the cache by removing entries that exceeded the cache TTL. -func (c *Cache) evict() { - c.Range(func(k, sme interface{}) bool { - e := sme.(*Entry) - if c.expired(e) { - c.Delete(k) - c.sizeCurrent-- - } - return true - }) -} - -// expired checks if an entry is expired -func (c *Cache) expired(e *Entry) bool { - return e.expiration.Before(time.Now()) -} - -// fits returns whether the cache fits more entries. -func (c *Cache) fits() bool { - return c.sizeTotal > c.sizeCurrent -} diff --git a/ocis-pkg/indexer/indexer.go b/ocis-pkg/indexer/indexer.go index d1af2a80db5..561ff1c9b3e 100644 --- a/ocis-pkg/indexer/indexer.go +++ b/ocis-pkg/indexer/indexer.go @@ -3,6 +3,10 @@ package indexer import ( "fmt" + "github.com/owncloud/ocis/ocis-pkg/sync" + "path" + "strings" + "github.com/CiscoM31/godata" "github.com/iancoleman/strcase" "github.com/owncloud/ocis/ocis-pkg/indexer/config" @@ -12,17 +16,13 @@ import ( _ "github.com/owncloud/ocis/ocis-pkg/indexer/index/disk" // to populate index "github.com/owncloud/ocis/ocis-pkg/indexer/option" "github.com/owncloud/ocis/ocis-pkg/indexer/registry" - "github.com/owncloud/ocis/ocis-pkg/sync" - "path" - "strings" ) // Indexer is a facade to configure and query over multiple indices. type Indexer struct { config *config.Config indices typeMap - - mu sync.NRWMutex + mu sync.NRWMutex } // IdxAddResult represents the result of an Add call on an index @@ -49,8 +49,8 @@ func getRegistryStrategy(cfg *config.Config) string { // Reset takes care of deleting all indices from storage and from the internal map of indices func (i *Indexer) Reset() error { - for j := range i.indices.allTypeMappings() { - for _, indices := range i.indices.getTypeMapping(j).IndicesByField { + for j := range i.indices { + for _, indices := range i.indices[j].IndicesByField { for _, idx := range indices { err := idx.Delete() if err != nil { @@ -58,7 +58,7 @@ func (i *Indexer) Reset() error { } } } - i.indices.deleteTypeMapping(j) + delete(i.indices, j) } return nil @@ -107,7 +107,7 @@ func (i *Indexer) Add(t interface{}) ([]IdxAddResult, error) { defer i.mu.Unlock(typeName) var results []IdxAddResult - if fields := i.indices.getTypeMapping(typeName); fields != nil { + if fields, ok := i.indices[typeName]; ok { for _, indices := range fields.IndicesByField { for _, idx := range indices { pkVal := valueOf(t, fields.PKFieldName) @@ -135,7 +135,7 @@ func (i *Indexer) FindBy(t interface{}, field string, val string) ([]string, err defer i.mu.RUnlock(typeName) resultPaths := make([]string, 0) - if fields := i.indices.getTypeMapping(typeName); fields != nil { + if fields, ok := i.indices[typeName]; ok { for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] { idxVal := val res, err := idx.Lookup(idxVal) @@ -169,7 +169,7 @@ func (i *Indexer) Delete(t interface{}) error { i.mu.Lock(typeName) defer i.mu.Unlock(typeName) - if fields := i.indices.getTypeMapping(typeName); fields != nil { + if fields, ok := i.indices[typeName]; ok { for _, indices := range fields.IndicesByField { for _, idx := range indices { pkVal := valueOf(t, fields.PKFieldName) @@ -192,7 +192,7 @@ func (i *Indexer) FindByPartial(t interface{}, field string, pattern string) ([] defer i.mu.RUnlock(typeName) resultPaths := make([]string, 0) - if fields := i.indices.getTypeMapping(typeName); fields != nil { + if fields, ok := i.indices[typeName]; ok { for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] { res, err := idx.Search(pattern) if err != nil { @@ -231,7 +231,7 @@ func (i *Indexer) Update(from, to interface{}) error { return fmt.Errorf("update types do not match: from %v to %v", typeNameFrom, typeNameTo) } - if fields := i.indices.getTypeMapping(typeNameFrom); fields != nil { + if fields, ok := i.indices[typeNameFrom]; ok { for fName, indices := range fields.IndicesByField { oldV := valueOf(from, fName) newV := valueOf(to, fName) diff --git a/ocis-pkg/indexer/map.go b/ocis-pkg/indexer/map.go index 00b23957e59..682218fdb99 100644 --- a/ocis-pkg/indexer/map.go +++ b/ocis-pkg/indexer/map.go @@ -1,55 +1,27 @@ package indexer -import ( - "github.com/owncloud/ocis/ocis-pkg/indexer/index" - "sync" -) +import "github.com/owncloud/ocis/ocis-pkg/indexer/index" // typeMap stores the indexer layout at runtime. -type fieldName = string + +type typeMap map[tName]typeMapping type tName = string -type typeMap struct { - sync.Map -} +type fieldName = string type typeMapping struct { PKFieldName string IndicesByField map[fieldName][]index.Index } -func (m *typeMap) allTypeMappings() map[tName]*typeMapping { - var rv map[tName]*typeMapping - - m.Range(func(key, value interface{}) bool { - rv[key.(string)] = value.(*typeMapping) - return true - }) - - return rv -} - -func (m *typeMap) getTypeMapping(typeName string) *typeMapping { - if value, ok := m.Load(typeName); ok { - return value.(*typeMapping) - } - - return nil -} - -func (m *typeMap) deleteTypeMapping(typeName string) { - m.Delete(typeName) -} - -func (m *typeMap) addIndex(typeName string, pkName string, idx index.Index) { - if val, ok := m.Load(typeName); ok { - rval := val.(*typeMapping) - rval.IndicesByField[idx.IndexBy()] = append(rval.IndicesByField[idx.IndexBy()], idx) +func (m typeMap) addIndex(typeName string, pkName string, idx index.Index) { + if val, ok := m[typeName]; ok { + val.IndicesByField[idx.IndexBy()] = append(val.IndicesByField[idx.IndexBy()], idx) return } - m.Store(typeName, &typeMapping{ + m[typeName] = typeMapping{ PKFieldName: pkName, IndicesByField: map[string][]index.Index{ idx.IndexBy(): {idx}, }, - }) + } } diff --git a/ocis-pkg/sync/cache.go b/ocis-pkg/sync/cache.go new file mode 100644 index 00000000000..15762eddfd7 --- /dev/null +++ b/ocis-pkg/sync/cache.go @@ -0,0 +1,97 @@ +package sync + +import ( + "sync" + "time" +) + +// Cache is a barebones cache implementation. +type Cache struct { + entries sync.Map + pool sync.Pool + capacity int + length int +} + +// CacheEntry represents an entry on the cache. You can type assert on V. +type CacheEntry struct { + V interface{} + expiration time.Time +} + +// NewCache returns a new instance of Cache. +func NewCache(capacity int) Cache { + return Cache{ + capacity: capacity, + pool: sync.Pool{New: func() interface{} { + return new(CacheEntry) + }}, + } +} + +// Get gets an entry by given key +func (c *Cache) Get(key string) *CacheEntry { + if mapEntry, ok := c.entries.Load(key); ok { + entry := mapEntry.(*CacheEntry) + if c.expired(entry) { + c.entries.Delete(key) + return nil + } + return entry + } + return nil +} + +// Set adds an entry for given key and value +func (c *Cache) Set(key string, val interface{}, expiration time.Time) { + if !c.fits() { + c.evict() + } + + poolEntry := c.pool.Get() + if mapEntry, loaded := c.entries.LoadOrStore(key, poolEntry); loaded { + entry := mapEntry.(*CacheEntry) + entry.V = val + entry.expiration = expiration + + c.pool.Put(poolEntry) + } else { + entry := poolEntry.(*CacheEntry) + entry.V = val + entry.expiration = expiration + + c.length++ + } +} + +// Unset removes an entry by given key +func (c *Cache) Unset(key string) bool { + if _, loaded := c.entries.LoadAndDelete(key); !loaded { + return false + } + + c.length-- + return true +} + +// evict frees memory from the cache by removing entries that exceeded the cache TTL. +func (c *Cache) evict() { + c.entries.Range(func(key, mapEntry interface{}) bool { + entry := mapEntry.(*CacheEntry) + if c.expired(entry) { + c.entries.Delete(key) + c.length-- + } + return true + }) +} + +// expired checks if an entry is expired +func (c *Cache) expired(e *CacheEntry) bool { + return e.expiration.Before(time.Now()) +} + +// fits returns whether the cache fits more entries. +func (c *Cache) fits() bool { + return c.capacity > c.length +} diff --git a/ocis-pkg/sync/cache_test.go b/ocis-pkg/sync/cache_test.go new file mode 100644 index 00000000000..1765000bdc2 --- /dev/null +++ b/ocis-pkg/sync/cache_test.go @@ -0,0 +1,54 @@ +package sync + +import ( + "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" +) + +func TestCache_Get(t *testing.T) { + size := 1024 + c := NewCache(size) + + for i := 0; i < size; i++ { + c.Set(strconv.Itoa(i), i, time.Now().Add(10*time.Second)) + } + + for i := 0; i < size; i++ { + assert.Equal(t, i, c.Get(strconv.Itoa(i)).V, "entry value is the same") + } + + assert.Nil(t, c.Get("unknown"), "entry is nil if unknown") + + wait := 10 * time.Millisecond + c.Set("expired", size, time.Now().Add(wait)) + time.Sleep(wait + 1) + assert.Nil(t, c.Get(strconv.Itoa(size)), "entry is nil if it's expired") +} + +func TestCache_Set(t *testing.T) { + c := NewCache(1) + + c.Set("new", "new", time.Now().Add(10*time.Millisecond)) + assert.Equal(t, "new", c.Get("new").V, "new entries can be added") + assert.Equal(t, 1, c.length, "adding new entries will increase the cache size") + + replacedExpiration := time.Now().Add(10 * time.Millisecond) + c.Set("new", "updated", replacedExpiration) + assert.Equal(t, "updated", c.Get("new").V, "entry values can be updated") + assert.Equal(t, replacedExpiration, c.Get("new").expiration, "entry expiration can be updated") + + time.Sleep(11 * time.Millisecond) + c.Set("eviction", "eviction", time.Now()) + assert.Equal(t, 1, c.length, "expired entries get removed") +} + +func TestCache_Unset(t *testing.T) { + c := NewCache(1) + + c.Set("new", "new", time.Now().Add(10*time.Millisecond)) + c.Unset("new") + assert.Nil(t, c.Get("new"), "entries can be removed") + assert.Equal(t, 0, c.length, "removing a entry decreases the cache size") +} diff --git a/ocis-pkg/sync/mutex.go b/ocis-pkg/sync/mutex.go deleted file mode 100644 index 258d998e28b..00000000000 --- a/ocis-pkg/sync/mutex.go +++ /dev/null @@ -1,74 +0,0 @@ -package sync - -import ( - "sync" -) - -// NRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them. -// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only. -type NRWMutex struct { - m sync.Mutex - mm map[string]*nrw -} - -type nrw struct { - m sync.RWMutex - c int -} - -// NewNRWMutex returns a new instance of NRWMutex. -func NewNRWMutex() NRWMutex { - return NRWMutex{mm: make(map[string]*nrw)} -} - -// Lock locks rw for writing. -func (c *NRWMutex) Lock(k string) { - c.m.Lock() - m := c.get(k) - m.c++ - c.m.Unlock() - m.m.Lock() -} - -// Unlock unlocks rw for writing. -func (c *NRWMutex) Unlock(k string) { - c.m.Lock() - defer c.m.Unlock() - m := c.get(k) - m.m.Unlock() - m.c-- - if m.c == 0 { - delete(c.mm, k) - } -} - -// RLock locks rw for reading. -func (c *NRWMutex) RLock(k string) { - c.m.Lock() - m := c.get(k) - m.c++ - c.m.Unlock() - m.m.RLock() -} - -// RUnlock undoes a single RLock call. -func (c *NRWMutex) RUnlock(k string) { - c.m.Lock() - defer c.m.Unlock() - m := c.get(k) - m.m.RUnlock() - m.c-- - if m.c == 0 { - delete(c.mm, k) - } -} - -func (c *NRWMutex) get(k string) *nrw { - m, ok := c.mm[k] - if !ok { - m = &nrw{} - c.mm[k] = m - } - - return m -} diff --git a/ocis-pkg/sync/nrwmutex.go b/ocis-pkg/sync/nrwmutex.go new file mode 100644 index 00000000000..961dbba890c --- /dev/null +++ b/ocis-pkg/sync/nrwmutex.go @@ -0,0 +1,49 @@ +package sync + +import ( + "sync" +) + +// NRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them. +// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only. +type NRWMutex struct { + pool sync.Pool + mus sync.Map +} + +// NewNRWMutex returns a new instance of NRWMutex. +func NewNRWMutex() NRWMutex { + return NRWMutex{pool: sync.Pool{New: func() interface{} { + return new(sync.RWMutex) + }}} +} + +// Lock locks rw for writing. +func (m *NRWMutex) Lock(name string) { + m.loadOrStore(name).Lock() +} + +// Unlock unlocks rw for writing. +func (m *NRWMutex) Unlock(name string) { + m.loadOrStore(name).Unlock() +} + +// RLock locks rw for reading. +func (m *NRWMutex) RLock(name string) { + m.loadOrStore(name).RLock() +} + +// RUnlock undoes a single RLock call. +func (m *NRWMutex) RUnlock(name string) { + m.loadOrStore(name).RUnlock() +} + +func (m *NRWMutex) loadOrStore(name string) *sync.RWMutex { + pmu := m.pool.Get() + mmu, loaded := m.mus.LoadOrStore(name, pmu) + if loaded { + m.pool.Put(pmu) + } + + return mmu.(*sync.RWMutex) +} diff --git a/ocis-pkg/sync/mutex_test.go b/ocis-pkg/sync/nrwmutex_test.go similarity index 80% rename from ocis-pkg/sync/mutex_test.go rename to ocis-pkg/sync/nrwmutex_test.go index ea867b14ccf..f8ee2480714 100644 --- a/ocis-pkg/sync/mutex_test.go +++ b/ocis-pkg/sync/nrwmutex_test.go @@ -6,16 +6,16 @@ import ( "testing" ) -func HammerMutex(m *NRWMutex, loops int, cdone chan bool) { +func HammerMutex(m *NRWMutex, loops int, c chan bool) { for i := 0; i < loops; i++ { id := fmt.Sprintf("%v", i) m.Lock(id) m.Unlock(id) } - cdone <- true + c <- true } -func TestMutex(t *testing.T) { +func TestNRWMutex(t *testing.T) { if n := runtime.SetMutexProfileFraction(1); n != 0 { t.Logf("got mutexrate %d expected 0", n) } diff --git a/proxy/pkg/middleware/oidc_auth.go b/proxy/pkg/middleware/oidc_auth.go index e6ac902fd5a..f89ef7b26cd 100644 --- a/proxy/pkg/middleware/oidc_auth.go +++ b/proxy/pkg/middleware/oidc_auth.go @@ -10,9 +10,9 @@ import ( "github.com/dgrijalva/jwt-go" gOidc "github.com/coreos/go-oidc" - "github.com/owncloud/ocis/ocis-pkg/cache" "github.com/owncloud/ocis/ocis-pkg/log" "github.com/owncloud/ocis/ocis-pkg/oidc" + "github.com/owncloud/ocis/ocis-pkg/sync" "golang.org/x/oauth2" ) @@ -24,7 +24,7 @@ type OIDCProvider interface { // OIDCAuth provides a middleware to check access secured by a static token. func OIDCAuth(optionSetters ...Option) func(next http.Handler) http.Handler { options := newOptions(optionSetters...) - tokenCache := cache.NewCache(options.UserinfoCacheSize) + tokenCache := sync.NewCache(options.UserinfoCacheSize) h := oidcAuth{ logger: options.Logger, @@ -74,7 +74,7 @@ type oidcAuth struct { providerFunc func() (OIDCProvider, error) httpClient *http.Client oidcIss string - tokenCache *cache.Cache + tokenCache *sync.Cache tokenCacheTTL time.Duration }