From 1ece42b23f2abae25f22c038baba7edd64b4ae44 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Wed, 1 Sep 2021 19:10:31 +0300 Subject: [PATCH] [#236] cache: Refactor ListObjectsCache Replaced map in ListObjectsCache by gcache. Now ListObjectsCache keeps only objectIDs and requests ObjectInfo from cache or NeoFS. Refactored ListObjectsCache keys: removed delimiter and method fields. Now ListObjectsCache keeps cache with all objects versions. Signed-off-by: Angira Kekteeva --- api/cache/objectslist.go | 92 ++++++++++++++++------------------- api/cache/objectslist_test.go | 81 +++++++++++++++--------------- api/layer/layer.go | 5 +- api/layer/object.go | 88 +++++++++++++++++++++------------ api/layer/versioning.go | 37 +++++--------- api/layer/versioning_test.go | 3 +- cmd/s3-gw/app.go | 13 +++++ cmd/s3-gw/app_settings.go | 1 + 8 files changed, 171 insertions(+), 149 deletions(-) diff --git a/api/cache/objectslist.go b/api/cache/objectslist.go index 324e441..72f2a99 100644 --- a/api/cache/objectslist.go +++ b/api/cache/objectslist.go @@ -1,11 +1,12 @@ package cache import ( - "sync" + "fmt" "time" + "github.com/bluele/gcache" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" - "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-api-go/pkg/object" ) /* @@ -19,86 +20,75 @@ import ( the list of objects. Otherwise we send the request to NeoFS. */ -// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct. type ( + // ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct. ObjectsListCache interface { - Get(key ObjectsListKey) []*api.ObjectInfo - Put(key ObjectsListKey, objects []*api.ObjectInfo) + Get(key ObjectsListKey) []*object.ID + Put(key ObjectsListKey, oids []*object.ID) error } ) -// DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects. -const DefaultObjectsListCacheLifetime = time.Second * 60 - const ( - // ListObjectsMethod is used to mark a cache entry for ListObjectsV1/V2. - ListObjectsMethod = "listObjects" - // ListVersionsMethod is used to mark a cache entry for ListObjectVersions. - ListVersionsMethod = "listVersions" + // DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects. + DefaultObjectsListCacheLifetime = time.Second * 60 + // DefaultObjectsListCacheSize is a default size of cache of ListObjects. + DefaultObjectsListCacheSize = 1e5 ) type ( // ListObjectsCache contains cache for ListObjects and ListObjectVersions. ListObjectsCache struct { - cacheLifetime time.Duration - caches map[ObjectsListKey]objectsListEntry - mtx sync.RWMutex - } - objectsListEntry struct { - list []*api.ObjectInfo + lifetime time.Duration + cache gcache.Cache } + // ObjectsListKey is a key to find a ObjectsListCache's entry. ObjectsListKey struct { - Method string - Key string - Delimiter string - Prefix string + cid string + prefix string } ) // NewObjectsListCache is a constructor which creates an object of ListObjectsCache with given lifetime of entries. -func NewObjectsListCache(lifetime time.Duration) *ListObjectsCache { +func NewObjectsListCache(cacheSize int, lifetime time.Duration) *ListObjectsCache { + gc := gcache.New(cacheSize).LRU().Build() + return &ListObjectsCache{ - caches: make(map[ObjectsListKey]objectsListEntry), - cacheLifetime: lifetime, + cache: gc, + lifetime: lifetime, } } // Get return list of ObjectInfo. -func (l *ListObjectsCache) Get(key ObjectsListKey) []*api.ObjectInfo { - l.mtx.RLock() - defer l.mtx.RUnlock() - if val, ok := l.caches[key]; ok { - return val.list +func (l *ListObjectsCache) Get(key ObjectsListKey) []*object.ID { + entry, err := l.cache.Get(key) + if err != nil { + return nil } - return nil + + result, ok := entry.([]*object.ID) + if !ok { + return nil + } + + return result } -// Put put a list of objects to cache. -func (l *ListObjectsCache) Put(key ObjectsListKey, objects []*api.ObjectInfo) { - if len(objects) == 0 { - return +// Put puts a list of objects to cache. +func (l *ListObjectsCache) Put(key ObjectsListKey, oids []*object.ID) error { + if len(oids) == 0 { + return fmt.Errorf("list is empty, cid: %s, prefix: %s", key.cid, key.prefix) } - var c objectsListEntry - l.mtx.Lock() - defer l.mtx.Unlock() - c.list = objects - l.caches[key] = c - time.AfterFunc(l.cacheLifetime, func() { - l.mtx.Lock() - delete(l.caches, key) - l.mtx.Unlock() - }) + + return l.cache.SetWithExpire(key, oids, l.lifetime) } // CreateObjectsListCacheKey returns ObjectsListKey with given CID, method, prefix, and delimiter. -func CreateObjectsListCacheKey(cid *cid.ID, method, prefix, delimiter string) (ObjectsListKey, error) { +func CreateObjectsListCacheKey(cid *cid.ID, prefix string) ObjectsListKey { p := ObjectsListKey{ - Method: method, - Key: cid.String(), - Delimiter: delimiter, - Prefix: prefix, + cid: cid.String(), + prefix: prefix, } - return p, nil + return p } diff --git a/api/cache/objectslist_test.go b/api/cache/objectslist_test.go index f2ef3b5..43183cd 100644 --- a/api/cache/objectslist_test.go +++ b/api/cache/objectslist_test.go @@ -3,16 +3,15 @@ package cache import ( "crypto/rand" "crypto/sha256" - "sort" "testing" "time" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/stretchr/testify/require" ) const testingCacheLifetime = 5 * time.Second +const testingCacheSize = 10 func randID(t *testing.T) *object.ID { id := object.NewID() @@ -31,83 +30,82 @@ func randSHA256Checksum(t *testing.T) (cs [sha256.Size]byte) { func TestObjectsListCache(t *testing.T) { var ( cacheSize = 10 - objects []*api.ObjectInfo + ids []*object.ID userKey = "key" ) for i := 0; i < cacheSize; i++ { id := randID(t) - objects = append(objects, &api.ObjectInfo{ID: id, Name: id.String()}) + ids = append(ids, id) } - sort.Slice(objects, func(i, j int) bool { - return objects[i].Name < objects[j].Name - }) - t.Run("lifetime", func(t *testing.T) { var ( - cache = NewObjectsListCache(testingCacheLifetime) - cacheKey = ObjectsListKey{Key: userKey} + cache = NewObjectsListCache(testingCacheSize, testingCacheLifetime) + cacheKey = ObjectsListKey{cid: userKey} ) - cache.Put(cacheKey, objects) + err := cache.Put(cacheKey, ids) + require.NoError(t, err) condition := func() bool { return cache.Get(cacheKey) == nil } - require.Never(t, condition, cache.cacheLifetime, time.Second) + require.Never(t, condition, cache.lifetime, time.Second) require.Eventually(t, condition, time.Second, 10*time.Millisecond) }) - t.Run("get cache with empty delimiter, empty prefix", func(t *testing.T) { + t.Run("get cache with empty prefix", func(t *testing.T) { var ( - cache = NewObjectsListCache(testingCacheLifetime) - cacheKey = ObjectsListKey{Key: userKey} + cache = NewObjectsListCache(testingCacheSize, testingCacheLifetime) + cacheKey = ObjectsListKey{cid: userKey} ) - cache.Put(cacheKey, objects) + err := cache.Put(cacheKey, ids) + require.NoError(t, err) + actual := cache.Get(cacheKey) - require.Equal(t, len(objects), len(actual)) - for i := range objects { - require.Equal(t, objects[i], actual[i]) + require.Equal(t, len(ids), len(actual)) + for i := range ids { + require.Equal(t, ids[i], actual[i]) } }) - t.Run("get cache with delimiter and prefix", func(t *testing.T) { + t.Run("get cache with prefix", func(t *testing.T) { cacheKey := ObjectsListKey{ - Key: userKey, - Delimiter: "/", - Prefix: "dir", + cid: userKey, + prefix: "dir", } - cache := NewObjectsListCache(testingCacheLifetime) - cache.Put(cacheKey, objects) + cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime) + err := cache.Put(cacheKey, ids) + require.NoError(t, err) + actual := cache.Get(cacheKey) - require.Equal(t, len(objects), len(actual)) - for i := range objects { - require.Equal(t, objects[i], actual[i]) + require.Equal(t, len(ids), len(actual)) + for i := range ids { + require.Equal(t, ids[i], actual[i]) } }) - t.Run("get cache with other delimiter and prefix", func(t *testing.T) { + t.Run("get cache with other prefix", func(t *testing.T) { var ( cacheKey = ObjectsListKey{ - Key: userKey, - Delimiter: "/", - Prefix: "dir", + cid: userKey, + prefix: "dir", } newKey = ObjectsListKey{ - Key: "key", - Delimiter: "*", - Prefix: "obj", + cid: "key", + prefix: "obj", } ) - cache := NewObjectsListCache(testingCacheLifetime) - cache.Put(cacheKey, objects) + cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime) + err := cache.Put(cacheKey, ids) + require.NoError(t, err) actual := cache.Get(newKey) require.Nil(t, actual) @@ -116,15 +114,16 @@ func TestObjectsListCache(t *testing.T) { t.Run("get cache with non-existing key", func(t *testing.T) { var ( cacheKey = ObjectsListKey{ - Key: userKey, + cid: userKey, } newKey = ObjectsListKey{ - Key: "asdf", + cid: "asdf", } ) - cache := NewObjectsListCache(testingCacheLifetime) - cache.Put(cacheKey, objects) + cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime) + err := cache.Put(cacheKey, ids) + require.NoError(t, err) actual := cache.Get(newKey) require.Nil(t, actual) diff --git a/api/layer/layer.go b/api/layer/layer.go index 7114e9f..e11b39b 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -41,6 +41,7 @@ type ( Lifetime time.Duration Size int ListObjectsLifetime time.Duration + ListObjectsSize int } // Params stores basic API parameters. @@ -197,7 +198,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client { return &layer{ pool: conns, log: log, - listsCache: cache.NewObjectsListCache(config.ListObjectsLifetime), + listsCache: cache.NewObjectsListCache(config.ListObjectsSize, config.ListObjectsLifetime), objCache: cache.New(config.Size, config.Lifetime), //todo reconsider cache params namesCache: cache.NewObjectsNameCache(1000, time.Minute), @@ -671,7 +672,7 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { return err } - objects, err := n.listSortedObjectsFromNeoFS(ctx, allObjectParams{Bucket: bucketInfo}) + objects, err := n.listSortedObjects(ctx, allObjectParams{Bucket: bucketInfo}) if err != nil { return err } diff --git a/api/layer/object.go b/api/layer/object.go index 975d3d3..990b344 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -22,9 +22,10 @@ import ( type ( findParams struct { - attr string - val string - cid *cid.ID + attr string + val string + cid *cid.ID + prefix string } getParams struct { @@ -82,6 +83,11 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, opts.AddFilter(p.attr, filename, object.MatchStringEqual) } } + if prefix, err := url.QueryUnescape(p.prefix); err != nil { + return nil, err + } else if prefix != "" { + opts.AddFilter(object.AttributeFileName, prefix, object.MatchCommonPrefix) + } return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx)) } @@ -457,7 +463,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis return &result, nil } -func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*api.ObjectInfo, error) { +func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*api.ObjectInfo, error) { versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) if err != nil { return nil, err @@ -479,19 +485,34 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *api.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) { - ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID}) - if err != nil { - return nil, err + var err error + + cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix) + ids := n.listsCache.Get(cacheKey) + + if ids == nil { + ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, prefix: prefix}) + if err != nil { + return nil, err + } + if err := n.listsCache.Put(cacheKey, ids); err != nil { + n.log.Error("couldn't cache list of objects", zap.Error(err)) + } } versions := make(map[string]*objectVersions, len(ids)/2) - for _, id := range ids { - meta, err := n.objectHead(ctx, bkt.CID, id) - if err != nil { - n.log.Warn("could not fetch object meta", zap.Error(err)) + + for i := 0; i < len(ids); i++ { + if ids[i] == nil { continue } - if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil { + obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt.CID, ids[i]) + if obj == nil { + // mark ids[i] as an address of invalid object + ids[i] = nil + continue + } + if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil { if isSystem(oi) { continue } @@ -573,7 +594,6 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( var ( err error bkt *api.BucketInfo - cacheKey cache.ObjectsListKey allObjects []*api.ObjectInfo ) @@ -581,26 +601,15 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( return nil, err } - if cacheKey, err = cache.CreateObjectsListCacheKey(bkt.CID, cache.ListObjectsMethod, p.Prefix, p.Delimiter); err != nil { + allObjects, err = n.listSortedObjects(ctx, allObjectParams{ + Bucket: bkt, + Prefix: p.Prefix, + Delimiter: p.Delimiter, + }) + if err != nil { return nil, err } - allObjects = n.listsCache.Get(cacheKey) - - if allObjects == nil { - allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{ - Bucket: bkt, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - }) - if err != nil { - return nil, err - } - - // putting to cache a copy of allObjects because allObjects can be modified further - n.listsCache.Put(cacheKey, append([]*api.ObjectInfo(nil), allObjects...)) - } - return allObjects, nil } @@ -613,3 +622,22 @@ func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *api.BucketInfo return settings.VersioningEnabled } + +func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, cid *cid.ID, oid *object.ID) *object.Object { + var ( + err error + meta = n.objCache.Get(newAddress(cid, oid)) + ) + if meta == nil { + meta, err = n.objectHead(ctx, cid, oid) + if err != nil { + n.log.Warn("could not fetch object meta", zap.Error(err)) + return nil + } + if err = n.objCache.Put(*meta); err != nil { + n.log.Error("couldn't cache an object", zap.Error(err)) + } + } + + return meta +} diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 1eae34f..12712ba 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -8,7 +8,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" - "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/errors" ) @@ -146,39 +145,29 @@ func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*Bu } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - var versions map[string]*objectVersions - res := &ListObjectVersionsInfo{} + var ( + versions map[string]*objectVersions + allObjects = make([]*api.ObjectInfo, 0, p.MaxKeys) + res = &ListObjectVersionsInfo{} + ) bkt, err := n.GetBucketInfo(ctx, p.Bucket) if err != nil { return nil, err } - cacheKey, err := cache.CreateObjectsListCacheKey(bkt.CID, cache.ListVersionsMethod, p.Prefix, p.Delimiter) - if err != nil { + if versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter); err != nil { return nil, err } - allObjects := n.listsCache.Get(cacheKey) - if allObjects == nil { - versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) - if err != nil { - return nil, err - } + sortedNames := make([]string, 0, len(versions)) + for k := range versions { + sortedNames = append(sortedNames, k) + } + sort.Strings(sortedNames) - sortedNames := make([]string, 0, len(versions)) - for k := range versions { - sortedNames = append(sortedNames, k) - } - sort.Strings(sortedNames) - - allObjects = make([]*api.ObjectInfo, 0, p.MaxKeys) - for _, name := range sortedNames { - allObjects = append(allObjects, versions[name].getFiltered()...) - } - - // putting to cache a copy of allObjects because allObjects can be modified further - n.listsCache.Put(cacheKey, append([]*api.ObjectInfo(nil), allObjects...)) + for _, name := range sortedNames { + allObjects = append(allObjects, versions[name].getFiltered()...) } for i, obj := range allObjects { diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 76ebdc7..93828dd 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -331,7 +331,8 @@ func prepareContext(t *testing.T) *testContext { layer: NewLayer(l, tp, &CacheConfig{ Size: cache.DefaultObjectsCacheSize, Lifetime: cache.DefaultObjectsCacheLifetime, - ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime}, + ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime, + ListObjectsSize: cache.DefaultObjectsListCacheSize}, ), bkt: bktName, bktID: bktID, diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 0d902e5..2a432f5 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -217,6 +217,7 @@ func (a *App) Server(ctx context.Context) { func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CacheConfig { cacheCfg := layer.CacheConfig{ ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime, + ListObjectsSize: cache.DefaultObjectsListCacheSize, Size: cache.DefaultObjectsCacheSize, Lifetime: cache.DefaultObjectsCacheLifetime, } @@ -253,6 +254,18 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CacheConfig { cacheCfg.ListObjectsLifetime = lifetime } } + + if v.IsSet(cfgListObjectsCacheSize) { + size := v.GetInt(cfgListObjectsCacheSize) + if size <= 0 { + l.Error("invalid cache size, using default value", + zap.Int("value in config", size), + zap.Int("default", cacheCfg.ListObjectsSize)) + } else { + cacheCfg.Size = size + } + } + return &cacheCfg } diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 9384dc6..c6e5ea9 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -58,6 +58,7 @@ const ( // Settings. cfgObjectsCacheLifetime = "cache.lifetime" cfgCacheSize = "cache.size" cfgListObjectsCacheLifetime = "cache.list_objects_lifetime" + cfgListObjectsCacheSize = "cache.list_objects_size" // Policy. cfgDefaultPolicy = "default_policy"