From 8a69c7cca08b1044dadc397718941256aed82c81 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 29 Jul 2021 11:14:14 +0300 Subject: [PATCH 1/2] [#179] api: Add cache for ListObjectsV1/V2 Refactored cache for ListObjects: made cache common for all versions, simplified: remove dependendence on token/startafter add mitable cachelifetime. Refactored listobjects Signed-off-by: Angira Kekteeva --- api/layer/layer.go | 4 +- api/layer/object.go | 174 ++++++++++++++++++++------------------ api/layer/object_cache.go | 71 +++++++++------- 3 files changed, 136 insertions(+), 113 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index c42e488..fdea8fb 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -24,7 +24,7 @@ type ( layer struct { pool pool.Pool log *zap.Logger - cache ObjectsListV2Cache + cache ObjectsListCache } // Params stores basic API parameters. @@ -131,7 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client { return &layer{ pool: conns, log: log, - cache: newListObjectsCache(), + cache: newListObjectsCache(defaultCacheLifetime), } } diff --git a/api/layer/object.go b/api/layer/object.go index dd88e54..2ce9704 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -13,7 +13,6 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" - "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "go.uber.org/zap" ) @@ -56,10 +55,9 @@ type ( } allObjectParams struct { - Bucket *BucketInfo - Delimiter string - Prefix string - StartAfter string + Bucket *BucketInfo + Delimiter string + Prefix string } ) @@ -209,45 +207,34 @@ func (n *layer) objectDelete(ctx context.Context, address *object.Address) error // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var ( - err error - result ListObjectsInfoV1 - bkt *BucketInfo + err error + result ListObjectsInfoV1 + allObjects []*ObjectInfo ) if p.MaxKeys == 0 { return &result, nil } - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { return nil, err } - allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: bkt, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - StartAfter: p.Marker, - }) - if err != nil { - return nil, err + if len(allObjects) == 0 { + return &result, nil + } + + if p.Marker != "" { + allObjects = trimAfterObjectName(p.Marker, allObjects) } if len(allObjects) > p.MaxKeys { result.IsTruncated = true - - nextObject := allObjects[p.MaxKeys-1] - result.NextMarker = nextObject.Name - allObjects = allObjects[:p.MaxKeys] + result.NextMarker = allObjects[len(allObjects)-1].Name } - for _, ov := range allObjects { - if ov.isDir { - result.Prefixes = append(result.Prefixes, ov.Name) - } else { - result.Objects = append(result.Objects, ov) - } - } + result.Prefixes, result.Objects = triageObjects(allObjects) return &result, nil } @@ -258,67 +245,40 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis err error result ListObjectsInfoV2 allObjects []*ObjectInfo - bkt *BucketInfo - cacheKey string - box *accessbox.Box ) if p.MaxKeys == 0 { return &result, nil } - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil { return nil, err } - if box, err = GetBoxData(ctx); err != nil { - return nil, err + if len(allObjects) == 0 { + return &result, nil } - cacheKey = createKey(box.Gate.AccessKey, bkt.CID) - if p.ContinuationToken != "" { - allObjects = n.cache.Get(p.ContinuationToken, cacheKey) - allObjects = trimStartAfter(p.StartAfter, allObjects) + allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) } - if allObjects == nil { - allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: bkt, - Prefix: p.Prefix, - Delimiter: p.Delimiter, - StartAfter: p.StartAfter, - }) - if err != nil { - return nil, err - } - - if p.ContinuationToken != "" { - allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) - } + if p.StartAfter != "" { + allObjects = trimAfterObjectName(p.StartAfter, allObjects) } if len(allObjects) > p.MaxKeys { result.IsTruncated = true - - restObjects := allObjects[p.MaxKeys:] - n.cache.Put(cacheKey, restObjects) - result.NextContinuationToken = restObjects[0].id.String() - allObjects = allObjects[:p.MaxKeys] + result.NextContinuationToken = allObjects[len(allObjects)-1].id.String() } - for _, ov := range allObjects { - if ov.isDir { - result.Prefixes = append(result.Prefixes, ov.Name) - } else { - result.Objects = append(result.Objects, ov) - } - } + result.Prefixes, result.Objects = triageObjects(allObjects) + return &result, nil } -func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { +func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { var ( err error ids []*object.ID @@ -346,9 +306,6 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] if _, ok := uniqNames[oi.Name]; ok { continue } - if len(p.StartAfter) > 0 && oi.Name <= p.StartAfter { - continue - } uniqNames[oi.Name] = oi.isDir @@ -363,22 +320,75 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] return objects, nil } -func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo { - if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter { - for i := range objects { - if objects[i].Name > startAfter { - return objects[i:] - } - } +func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo { + if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter { + return nil } - return objects -} - -func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo { - for i, obj := range objects { - if obj.ID().String() == id { + for i := range objects { + if objects[i].Name > startAfter { return objects[i:] } } - return objects + + return nil +} + +func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo { + if len(objects) != 0 && objects[len(objects)-1].id.String() == id { + return []*ObjectInfo{} + } + for i, obj := range objects { + if obj.ID().String() == id { + return objects[i+1:] + } + } + + return nil +} + +func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*ObjectInfo) { + for _, ov := range allObjects { + if ov.isDir { + prefixes = append(prefixes, ov.Name) + } else { + objects = append(objects, ov) + } + } + + return +} + +func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) { + var ( + err error + bkt *BucketInfo + cacheKey cacheOptions + allObjects []*ObjectInfo + ) + + if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + return nil, err + } + + if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil { + return nil, err + } + + allObjects = n.cache.Get(cacheKey) + + if allObjects == nil { + allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{ + Bucket: bkt, + Prefix: p.Prefix, + Delimiter: p.Delimiter, + }) + if err != nil { + return nil, err + } + + // putting to cache a copy of allObjects because allObjects can be modified further + n.cache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + } + + return allObjects, nil } diff --git a/api/layer/object_cache.go b/api/layer/object_cache.go index 3d3fd92..ce15157 100644 --- a/api/layer/object_cache.go +++ b/api/layer/object_cache.go @@ -1,6 +1,7 @@ package layer import ( + "context" "sync" "time" @@ -8,72 +9,84 @@ import ( ) /* - This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken. + This is an implementation of a cache for ListObjectsV2/V1 which we can return to users when we receive a ListObjects + request. - The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with - creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after - defaultCacheLifetime value. + The cache is a map which has a key: cacheOptions struct and a value: list of objects. After putting a record we + start a timer (via time.AfterFunc) that removes the record after defaultCacheLifetime value. - ContinuationToken in our gateway is an objectID in NeoFS. - - We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect - to other gateways and they should be able to get a list of objects. - When we receive the token from the user we just try to find the cache and then we return the list of objects which - starts from this token (i.e. objectID). + When we get a request from the user we just try to find the suitable and non-expired cache and then we return + the list of objects. Otherwise we send the request to NeoFS. */ -// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct. +// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct. type ( - ObjectsListV2Cache interface { - Get(token string, key string) []*ObjectInfo - Put(key string, objects []*ObjectInfo) + ObjectsListCache interface { + Get(key cacheOptions) []*ObjectInfo + Put(key cacheOptions, objects []*ObjectInfo) } ) -var ( - defaultCacheLifetime = time.Second * 60 -) +const defaultCacheLifetime = time.Second * 60 type ( listObjectsCache struct { - caches map[string]cache - mtx sync.RWMutex + cacheLifetime time.Duration + caches map[cacheOptions]cache + mtx sync.RWMutex } cache struct { list []*ObjectInfo } + cacheOptions struct { + key string + delimiter string + prefix string + } ) -func newListObjectsCache() *listObjectsCache { +func newListObjectsCache(lifetime time.Duration) *listObjectsCache { return &listObjectsCache{ - caches: make(map[string]cache), + caches: make(map[cacheOptions]cache), + cacheLifetime: lifetime, } } -func (l *listObjectsCache) Get(token, key string) []*ObjectInfo { +func (l *listObjectsCache) Get(key cacheOptions) []*ObjectInfo { l.mtx.RLock() defer l.mtx.RUnlock() if val, ok := l.caches[key]; ok { - return trimAfterObjectID(token, val.list) + return val.list } - return nil } -func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) { +func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) { + if len(objects) == 0 { + return + } var c cache - l.mtx.Lock() defer l.mtx.Unlock() c.list = objects l.caches[key] = c - time.AfterFunc(defaultCacheLifetime, func() { + time.AfterFunc(l.cacheLifetime, func() { l.mtx.Lock() delete(l.caches, key) l.mtx.Unlock() }) } -func createKey(accessKey string, cid *cid.ID) string { - return accessKey + cid.String() +func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) { + box, err := GetBoxData(ctx) + if err != nil { + return cacheOptions{}, err + } + p := cacheOptions{ + key: box.Gate.AccessKey + cid.String(), + delimiter: delimiter, + prefix: prefix, + } + + return p, nil } From 3cdcbf6e2135fc2edac680f78cdd5883110280d9 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Mon, 9 Aug 2021 13:49:08 +0300 Subject: [PATCH 2/2] [#179] Add unittests for cache Signed-off-by: Angira Kekteeva --- api/layer/object_cache_test.go | 220 +++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 api/layer/object_cache_test.go diff --git a/api/layer/object_cache_test.go b/api/layer/object_cache_test.go new file mode 100644 index 0000000..2ad9b6c --- /dev/null +++ b/api/layer/object_cache_test.go @@ -0,0 +1,220 @@ +package layer + +import ( + "crypto/rand" + "crypto/sha256" + "sort" + "testing" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/stretchr/testify/require" +) + +const testingCacheLifetime = 5 * time.Second + +func randID(t *testing.T) *object.ID { + id := object.NewID() + id.SetSHA256(randSHA256Checksum(t)) + + return id +} + +func randSHA256Checksum(t *testing.T) (cs [sha256.Size]byte) { + _, err := rand.Read(cs[:]) + require.NoError(t, err) + + return +} + +func TestTrimAfterObjectName(t *testing.T) { + var ( + objects []*ObjectInfo + names = []string{"b", "c", "d"} + ) + for _, name := range names { + objects = append(objects, &ObjectInfo{Name: name}) + } + + t.Run("startafter before all objects", func(t *testing.T) { + actual := trimAfterObjectName("a", objects) + require.Equal(t, objects, actual) + }) + + t.Run("startafter first object", func(t *testing.T) { + actual := trimAfterObjectName(names[0], objects) + require.Equal(t, objects[1:], actual) + }) + + t.Run("startafter second-to-last object", func(t *testing.T) { + actual := trimAfterObjectName(names[len(names)-2], objects) + require.Equal(t, objects[len(objects)-1:], actual) + }) + + t.Run("startafter last object", func(t *testing.T) { + actual := trimAfterObjectName(names[len(names)-1], objects) + require.Empty(t, actual) + }) + + t.Run("startafter after all objects", func(t *testing.T) { + actual := trimAfterObjectName("z", objects) + require.Nil(t, actual) + }) + + t.Run("empty objects", func(t *testing.T) { + actual := trimAfterObjectName(names[0], []*ObjectInfo{}) + require.Nil(t, actual) + }) + + t.Run("nil objects", func(t *testing.T) { + actual := trimAfterObjectName(names[0], nil) + require.Nil(t, actual) + }) + + t.Run("empty startafter", func(t *testing.T) { + actual := trimAfterObjectName("", objects) + require.Equal(t, objects, actual) + }) +} + +func TestTrimAfterObjectID(t *testing.T) { + var ( + objects []*ObjectInfo + ids []*object.ID + numberOfIDS = 3 + ) + + for i := 0; i < numberOfIDS; i++ { + id := randID(t) + objects = append(objects, &ObjectInfo{id: id}) + ids = append(ids, id) + } + + t.Run("existing id", func(t *testing.T) { + actual := trimAfterObjectID(ids[0].String(), objects) + require.Equal(t, objects[1:], actual) + }) + + t.Run("second to last id", func(t *testing.T) { + actual := trimAfterObjectID(ids[len(ids)-2].String(), objects) + require.Equal(t, objects[len(objects)-1:], actual) + }) + + t.Run("non-existing id", func(t *testing.T) { + actual := trimAfterObjectID("z", objects) + require.Nil(t, actual) + }) + + t.Run("last id", func(t *testing.T) { + actual := trimAfterObjectID(ids[len(ids)-1].String(), objects) + require.Empty(t, actual) + }) + + t.Run("empty id", func(t *testing.T) { + actual := trimAfterObjectID("", objects) + require.Nil(t, actual) + }) +} + +func TestObjectsListCache(t *testing.T) { + var ( + cacheSize = 10 + objects []*ObjectInfo + userKey = "key" + ) + + for i := 0; i < cacheSize; i++ { + id := randID(t) + objects = append(objects, &ObjectInfo{id: id, Name: id.String()}) + } + + sort.Slice(objects, func(i, j int) bool { + return objects[i].Name < objects[j].Name + }) + + t.Run("lifetime", func(t *testing.T) { + var ( + cache = newListObjectsCache(testingCacheLifetime) + cacheKey = cacheOptions{key: userKey} + ) + + cache.Put(cacheKey, objects) + + condition := func() bool { + return cache.Get(cacheKey) == nil + } + + require.Never(t, condition, cache.cacheLifetime, time.Second) + require.Eventually(t, condition, time.Second, 10*time.Millisecond) + }) + + t.Run("get cache with empty delimiter, empty prefix", func(t *testing.T) { + var ( + cache = newListObjectsCache(testingCacheLifetime) + cacheKey = cacheOptions{key: userKey} + ) + cache.Put(cacheKey, objects) + actual := cache.Get(cacheKey) + + require.Equal(t, len(objects), len(actual)) + for i := range objects { + require.Equal(t, objects[i], actual[i]) + } + }) + + t.Run("get cache with delimiter and prefix", func(t *testing.T) { + cacheKey := cacheOptions{ + key: userKey, + delimiter: "/", + prefix: "dir", + } + + cache := newListObjectsCache(testingCacheLifetime) + cache.Put(cacheKey, objects) + actual := cache.Get(cacheKey) + + require.Equal(t, len(objects), len(actual)) + for i := range objects { + require.Equal(t, objects[i], actual[i]) + } + }) + + t.Run("get cache with other delimiter and prefix", func(t *testing.T) { + var ( + cacheKey = cacheOptions{ + key: userKey, + delimiter: "/", + prefix: "dir", + } + + newKey = cacheOptions{ + key: "key", + delimiter: "*", + prefix: "obj", + } + ) + + cache := newListObjectsCache(testingCacheLifetime) + cache.Put(cacheKey, objects) + + actual := cache.Get(newKey) + require.Nil(t, actual) + }) + + t.Run("get cache with non-existing key", func(t *testing.T) { + var ( + cacheKey = cacheOptions{ + key: userKey, + } + newKey = cacheOptions{ + key: "asdf", + } + ) + + cache := newListObjectsCache(testingCacheLifetime) + cache.Put(cacheKey, objects) + + actual := cache.Get(newKey) + require.Nil(t, actual) + }) +}