From 97a7d16f68a446d902a7bcbbc4cffe6d0897e832 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 22 Jul 2021 11:57:37 +0300 Subject: [PATCH 1/3] [#112] api: Refactor allObjectsParams struct Replaced Bucket (means name) by BucketInfo struct Signed-off-by: Angira Kekteeva --- api/layer/object.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index e006057..d61a924 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -55,7 +55,7 @@ type ( } allObjectParams struct { - Bucket string + Bucket *BucketInfo Delimiter string Prefix string StartAfter string @@ -210,14 +210,19 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis var ( err error result ListObjectsInfoV1 + bkt *BucketInfo ) if p.MaxKeys == 0 { return &result, nil } + if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + return nil, err + } + allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: p.Bucket, + Bucket: bkt, Prefix: p.Prefix, Delimiter: p.Delimiter, StartAfter: p.Marker, @@ -252,17 +257,22 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis err error result ListObjectsInfoV2 allObjects []*ObjectInfo + bkt *BucketInfo ) if p.MaxKeys == 0 { return &result, nil } + if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + return nil, err + } + if p.ContinuationToken != "" { // find cache with continuation token } else { allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{ - Bucket: p.Bucket, + Bucket: bkt, Prefix: p.Prefix, Delimiter: p.Delimiter, StartAfter: p.StartAfter, @@ -292,14 +302,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { var ( err error - bkt *BucketInfo ids []*object.ID uniqNames = make(map[string]bool) ) - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - return nil, err - } else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID}); err != nil { + if ids, err = n.objectSearch(ctx, &findParams{cid: p.Bucket.CID}); err != nil { return nil, err } @@ -308,14 +315,14 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] for _, id := range ids { addr := object.NewAddress() addr.SetObjectID(id) - addr.SetContainerID(bkt.CID) + addr.SetContainerID(p.Bucket.CID) meta, err := n.objectHead(ctx, addr) if err != nil { n.log.Warn("could not fetch object meta", zap.Error(err)) continue } - if oi := objectInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); oi != nil { + if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil { // use only unique dir names if _, ok := uniqNames[oi.Name]; ok { continue From c24fe5cc2169129ec66cda72ac9f60b31a283b1f Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Wed, 28 Jul 2021 16:27:06 +0300 Subject: [PATCH 2/3] [#112] Move getBoxData from handler to layer And made it exported Signed-off-by: Angira Kekteeva --- api/handler/put.go | 18 +----------------- api/layer/util.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/api/handler/put.go b/api/handler/put.go index cd98a41..a0e7ff3 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -1,7 +1,6 @@ package handler import ( - "context" "encoding/xml" "fmt" "net/http" @@ -13,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/policy" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/layer" - "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "go.uber.org/zap" ) @@ -107,7 +105,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) { return } - p.BoxData, err = getBoxData(r.Context()) + p.BoxData, err = layer.GetBoxData(r.Context()) if err != nil { h.registerAndSendError(w, r, err, "could not get boxData") return @@ -172,17 +170,3 @@ func parseBasicACL(basicACL string) (uint32, error) { return uint32(value), nil } } - -func getBoxData(ctx context.Context) (*accessbox.Box, error) { - var boxData *accessbox.Box - data, ok := ctx.Value(api.BoxData).(*accessbox.Box) - if !ok || data == nil { - return nil, fmt.Errorf("couldn't get box data from context") - } - - boxData = data - if boxData.Gate == nil { - boxData.Gate = &accessbox.GateData{} - } - return boxData, nil -} diff --git a/api/layer/util.go b/api/layer/util.go index 9f4247f..5bcee8b 100644 --- a/api/layer/util.go +++ b/api/layer/util.go @@ -1,6 +1,8 @@ package layer import ( + "context" + "fmt" "os" "strconv" "strings" @@ -8,6 +10,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" ) type ( @@ -172,3 +176,18 @@ func (o *ObjectInfo) ID() *object.ID { return o.id } // IsDir allows to check if object is a directory. func (o *ObjectInfo) IsDir() bool { return o.isDir } + +// GetBoxData extracts accessbox.Box from context. +func GetBoxData(ctx context.Context) (*accessbox.Box, error) { + var boxData *accessbox.Box + data, ok := ctx.Value(api.BoxData).(*accessbox.Box) + if !ok || data == nil { + return nil, fmt.Errorf("couldn't get box data from context") + } + + boxData = data + if boxData.Gate == nil { + boxData.Gate = &accessbox.GateData{} + } + return boxData, nil +} From 0ceea95e119dc1447525263347c690ed2b6684b2 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Wed, 28 Jul 2021 16:28:05 +0300 Subject: [PATCH 3/3] [#112] Add cache to ListObjects and layer Signed-off-by: Angira Kekteeva --- api/layer/layer.go | 10 +++-- api/layer/object.go | 45 ++++++++++++++++++++-- api/layer/object_cache.go | 79 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 7 deletions(-) create mode 100644 api/layer/object_cache.go diff --git a/api/layer/layer.go b/api/layer/layer.go index 22a6f06..c42e488 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -22,8 +22,9 @@ import ( type ( layer struct { - pool pool.Pool - log *zap.Logger + pool pool.Pool + log *zap.Logger + cache ObjectsListV2Cache } // Params stores basic API parameters. @@ -128,8 +129,9 @@ const ( // and establishes gRPC connection with node. func NewLayer(log *zap.Logger, conns pool.Pool) Client { return &layer{ - pool: conns, - log: log, + pool: conns, + log: log, + cache: newListObjectsCache(), } } diff --git a/api/layer/object.go b/api/layer/object.go index d61a924..dd88e54 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -13,6 +13,7 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "go.uber.org/zap" ) @@ -258,6 +259,8 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis result ListObjectsInfoV2 allObjects []*ObjectInfo bkt *BucketInfo + cacheKey string + box *accessbox.Box ) if p.MaxKeys == 0 { @@ -268,9 +271,18 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis return nil, err } + if box, err = GetBoxData(ctx); err != nil { + return nil, err + } + + cacheKey = createKey(box.Gate.AccessKey, bkt.CID) + if p.ContinuationToken != "" { - // find cache with continuation token - } else { + allObjects = n.cache.Get(p.ContinuationToken, cacheKey) + allObjects = trimStartAfter(p.StartAfter, allObjects) + } + + if allObjects == nil { allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{ Bucket: bkt, Prefix: p.Prefix, @@ -280,13 +292,20 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis if err != nil { return nil, err } + + if p.ContinuationToken != "" { + allObjects = trimAfterObjectID(p.ContinuationToken, allObjects) + } } if len(allObjects) > p.MaxKeys { result.IsTruncated = true + restObjects := allObjects[p.MaxKeys:] + n.cache.Put(cacheKey, restObjects) + result.NextContinuationToken = restObjects[0].id.String() + allObjects = allObjects[:p.MaxKeys] - // add creating of cache here } for _, ov := range allObjects { @@ -343,3 +362,23 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([] return objects, nil } + +func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo { + if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter { + for i := range objects { + if objects[i].Name > startAfter { + return objects[i:] + } + } + } + return objects +} + +func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo { + for i, obj := range objects { + if obj.ID().String() == id { + return objects[i:] + } + } + return objects +} diff --git a/api/layer/object_cache.go b/api/layer/object_cache.go new file mode 100644 index 0000000..3d3fd92 --- /dev/null +++ b/api/layer/object_cache.go @@ -0,0 +1,79 @@ +package layer + +import ( + "sync" + "time" + + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" +) + +/* + This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken. + + The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with + creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after + defaultCacheLifetime value. + + ContinuationToken in our gateway is an objectID in NeoFS. + + We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect + to other gateways and they should be able to get a list of objects. + When we receive the token from the user we just try to find the cache and then we return the list of objects which + starts from this token (i.e. objectID). +*/ + +// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct. +type ( + ObjectsListV2Cache interface { + Get(token string, key string) []*ObjectInfo + Put(key string, objects []*ObjectInfo) + } +) + +var ( + defaultCacheLifetime = time.Second * 60 +) + +type ( + listObjectsCache struct { + caches map[string]cache + mtx sync.RWMutex + } + cache struct { + list []*ObjectInfo + } +) + +func newListObjectsCache() *listObjectsCache { + return &listObjectsCache{ + caches: make(map[string]cache), + } +} + +func (l *listObjectsCache) Get(token, key string) []*ObjectInfo { + l.mtx.RLock() + defer l.mtx.RUnlock() + if val, ok := l.caches[key]; ok { + return trimAfterObjectID(token, val.list) + } + + return nil +} + +func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) { + var c cache + + l.mtx.Lock() + defer l.mtx.Unlock() + c.list = objects + l.caches[key] = c + time.AfterFunc(defaultCacheLifetime, func() { + l.mtx.Lock() + delete(l.caches, key) + l.mtx.Unlock() + }) +} + +func createKey(accessKey string, cid *cid.ID) string { + return accessKey + cid.String() +}