From 11558124cd75239a7e30133f0dfd5ecac066e316 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 18 Aug 2021 16:48:58 +0300 Subject: [PATCH] [#122] Add versioning cache Signed-off-by: Denis Kirillov --- api/cache/bucket.go | 64 ++++++++++++ api/cache/head_cache.go | 55 ++++++++++ api/cache/object_cache.go | 6 +- api/cache/object_cache_test.go | 16 +-- api/cache/system.go | 56 ++++++++++ api/handler/head.go | 26 +++-- api/handler/object_list.go | 2 +- api/layer/container.go | 27 +++-- api/layer/detector.go | 58 ++++++++--- api/layer/layer.go | 182 +++++++++++++++++---------------- api/layer/object.go | 171 +++++++++++++++++++++---------- api/layer/object_list_cache.go | 7 +- api/layer/util.go | 15 ++- api/layer/util_test.go | 7 +- api/layer/versioning_test.go | 14 +-- 15 files changed, 503 insertions(+), 203 deletions(-) create mode 100644 api/cache/bucket.go create mode 100644 api/cache/head_cache.go create mode 100644 api/cache/system.go diff --git a/api/cache/bucket.go b/api/cache/bucket.go new file mode 100644 index 0000000..c6896bd --- /dev/null +++ b/api/cache/bucket.go @@ -0,0 +1,64 @@ +package cache + +import ( + "time" + + "github.com/bluele/gcache" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" +) + +type ( + // BucketCache provides interface for lru cache for objects. + BucketCache interface { + Get(key string) *BucketInfo + Put(bkt *BucketInfo) error + Delete(key string) bool + } + + // BucketInfo stores basic bucket data. + BucketInfo struct { + Name string + CID *cid.ID + Owner *owner.ID + Created time.Time + } + + // GetBucketCache contains cache with objects and lifetime of cache entries. + GetBucketCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewBucketCache creates an object of BucketCache. +func NewBucketCache(cacheSize int, lifetime time.Duration) *GetBucketCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &GetBucketCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *GetBucketCache) Get(key string) *BucketInfo { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*BucketInfo) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *GetBucketCache) Put(bkt *BucketInfo) error { + return o.cache.SetWithExpire(bkt.Name, bkt, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *GetBucketCache) Delete(key string) bool { + return o.cache.Remove(key) +} diff --git a/api/cache/head_cache.go b/api/cache/head_cache.go new file mode 100644 index 0000000..08ac3d1 --- /dev/null +++ b/api/cache/head_cache.go @@ -0,0 +1,55 @@ +package cache + +import ( + "time" + + "github.com/bluele/gcache" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +// HeadObjectsCache provides interface for lru cache for objects. +type HeadObjectsCache interface { + Get(key string) *object.Address + Put(key string, address *object.Address) error + Delete(key string) bool +} + +type ( + // HeadObjectCache contains cache with objects and lifetime of cache entries. + HeadObjectCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewHeadObject creates an object of ObjectHeadersCache. +func NewHeadObject(cacheSize int, lifetime time.Duration) *HeadObjectCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &HeadObjectCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *HeadObjectCache) Get(key string) *object.Address { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*object.Address) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *HeadObjectCache) Put(key string, address *object.Address) error { + return o.cache.SetWithExpire(key, address, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *HeadObjectCache) Delete(key string) bool { + return o.cache.Remove(key) +} diff --git a/api/cache/object_cache.go b/api/cache/object_cache.go index f7bcc24..708fb69 100644 --- a/api/cache/object_cache.go +++ b/api/cache/object_cache.go @@ -10,7 +10,7 @@ import ( // ObjectsCache provides interface for lru cache for objects. type ObjectsCache interface { Get(address *object.Address) *object.Object - Put(address *object.Address, obj object.Object) error + Put(obj object.Object) error Delete(address *object.Address) bool } @@ -52,8 +52,8 @@ func (o *ObjectHeadersCache) Get(address *object.Address) *object.Object { } // Put puts an object to cache. -func (o *ObjectHeadersCache) Put(address *object.Address, obj object.Object) error { - return o.cache.SetWithExpire(address.String(), obj, o.lifetime) +func (o *ObjectHeadersCache) Put(obj object.Object) error { + return o.cache.SetWithExpire(obj.ContainerID().String()+"/"+obj.ID().String(), obj, o.lifetime) } // Delete deletes an object from cache. diff --git a/api/cache/object_cache_test.go b/api/cache/object_cache_test.go index b9ee929..e307004 100644 --- a/api/cache/object_cache_test.go +++ b/api/cache/object_cache_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test" "github.com/stretchr/testify/require" ) @@ -14,23 +16,23 @@ const ( ) func TestCache(t *testing.T) { - var ( - address = objecttest.Address() - object = objecttest.Object() - ) + obj := objecttest.Object() + address := object.NewAddress() + address.SetContainerID(obj.ContainerID()) + address.SetObjectID(obj.ID()) t.Run("check get", func(t *testing.T) { cache := New(cachesize, lifetime) - err := cache.Put(address, *object) + err := cache.Put(*obj) require.NoError(t, err) actual := cache.Get(address) - require.Equal(t, object, actual) + require.Equal(t, obj, actual) }) t.Run("check delete", func(t *testing.T) { cache := New(cachesize, lifetime) - err := cache.Put(address, *object) + err := cache.Put(*obj) require.NoError(t, err) cache.Delete(address) diff --git a/api/cache/system.go b/api/cache/system.go new file mode 100644 index 0000000..d8f18b0 --- /dev/null +++ b/api/cache/system.go @@ -0,0 +1,56 @@ +package cache + +import ( + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + + "github.com/bluele/gcache" +) + +type ( + // SystemCache provides interface for lru cache for objects. + SystemCache interface { + Get(key string) *object.Object + Put(key string, obj *object.Object) error + Delete(key string) bool + } + + // systemCache contains cache with objects and lifetime of cache entries. + systemCache struct { + cache gcache.Cache + lifetime time.Duration + } +) + +// NewSystemCache creates an object of SystemCache. +func NewSystemCache(cacheSize int, lifetime time.Duration) SystemCache { + gc := gcache.New(cacheSize).LRU().Build() + + return &systemCache{cache: gc, lifetime: lifetime} +} + +// Get returns cached object. +func (o *systemCache) Get(key string) *object.Object { + entry, err := o.cache.Get(key) + if err != nil { + return nil + } + + result, ok := entry.(*object.Object) + if !ok { + return nil + } + + return result +} + +// Put puts an object to cache. +func (o *systemCache) Put(key string, obj *object.Object) error { + return o.cache.SetWithExpire(key, obj, o.lifetime) +} + +// Delete deletes an object from cache. +func (o *systemCache) Delete(key string) bool { + return o.cache.Remove(key) +} diff --git a/api/handler/head.go b/api/handler/head.go index f6f1a27..1f0a303 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -46,18 +46,22 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { h.logAndSendError(w, "could not fetch object info", reqInfo, err) return } - buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) - getParams := &layer.GetObjectParams{ - ObjectInfo: inf, - Writer: buffer, - Range: getRangeToDetectContentType(inf.Size), - VersionID: reqInfo.URL.Query().Get("versionId"), + + if len(inf.ContentType) == 0 { + buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) + getParams := &layer.GetObjectParams{ + ObjectInfo: inf, + Writer: buffer, + Range: getRangeToDetectContentType(inf.Size), + VersionID: reqInfo.URL.Query().Get("versionId"), + } + if err = h.obj.GetObject(r.Context(), getParams); err != nil { + h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID())) + return + } + inf.ContentType = http.DetectContentType(buffer.Bytes()) } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { - h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", inf.ID())) - return - } - inf.ContentType = http.DetectContentType(buffer.Bytes()) + writeHeaders(w.Header(), inf) w.WriteHeader(http.StatusOK) } diff --git a/api/handler/object_list.go b/api/handler/object_list.go index d0233e9..293e6bb 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -263,7 +263,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck } for _, prefix := range info.CommonPrefixes { - res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: *prefix}) + res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: prefix}) } for _, ver := range info.Version { diff --git a/api/layer/container.go b/api/layer/container.go index 3bf7dc0..1e7b3b5 100644 --- a/api/layer/container.go +++ b/api/layer/container.go @@ -11,37 +11,29 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" "github.com/nspcc-dev/neofs-api-go/pkg/container" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" "go.uber.org/zap" ) type ( - // BucketInfo stores basic bucket data. - BucketInfo struct { - Name string - CID *cid.ID - Owner *owner.ID - Created time.Time - BasicACL uint32 - } // BucketACL extends BucketInfo by eacl.Table. BucketACL struct { - Info *BucketInfo + Info *cache.BucketInfo EACL *eacl.Table } ) -func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, error) { +func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*cache.BucketInfo, error) { var ( err error res *container.Container rid = api.GetRequestID(ctx) bearerOpt = n.BearerOpt(ctx) - info = &BucketInfo{ + info = &cache.BucketInfo{ CID: cid, Name: cid.String(), } @@ -82,10 +74,17 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, er } } + if err := n.bucketCache.Put(info); err != nil { + n.log.Warn("could not put bucket info into cache", + zap.Stringer("cid", cid), + zap.String("bucket_name", info.Name), + zap.Error(err)) + } + return info, nil } -func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) { +func (n *layer) containerList(ctx context.Context) ([]*cache.BucketInfo, error) { var ( err error own = n.Owner(ctx) @@ -101,7 +100,7 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) { return nil, err } - list := make([]*BucketInfo, 0, len(res)) + list := make([]*cache.BucketInfo, 0, len(res)) for _, cid := range res { info, err := n.containerInfo(ctx, cid) if err != nil { diff --git a/api/layer/detector.go b/api/layer/detector.go index 3a9b7d8..81ec75b 100644 --- a/api/layer/detector.go +++ b/api/layer/detector.go @@ -3,24 +3,56 @@ package layer import ( "io" "net/http" - "sync" ) -type detector struct { - io.Reader - sync.Once +type ( + detector struct { + io.Reader + err error + data []byte + } + errReader struct { + data []byte + err error + offset int + } +) - contentType string +const contentTypeDetectSize = 512 + +func newReader(data []byte, err error) *errReader { + return &errReader{data: data, err: err} } -func newDetector(r io.Reader) *detector { - return &detector{Reader: r} +func (r *errReader) Read(b []byte) (int, error) { + if r.offset >= len(r.data) { + return 0, io.EOF + } + n := copy(b, r.data[r.offset:]) + r.offset += n + if r.offset >= len(r.data) { + return n, r.err + } + return n, nil } -func (d *detector) Read(data []byte) (int, error) { - d.Do(func() { - d.contentType = http.DetectContentType(data) - }) - - return d.Reader.Read(data) +func newDetector(reader io.Reader) *detector { + return &detector{ + data: make([]byte, contentTypeDetectSize), + Reader: reader, + } +} + +func (d *detector) Detect() (string, error) { + n, err := d.Reader.Read(d.data) + if err != nil && err != io.EOF { + d.err = err + return "", err + } + d.data = d.data[:n] + return http.DetectContentType(d.data), nil +} + +func (d *detector) MultiReader() io.Reader { + return io.MultiReader(newReader(d.data, d.err), d.Reader) } diff --git a/api/layer/layer.go b/api/layer/layer.go index c454ac2..051629e 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -28,10 +28,13 @@ import ( type ( layer struct { - pool pool.Pool - log *zap.Logger - listObjCache ObjectsListCache - objCache cache.ObjectsCache + pool pool.Pool + log *zap.Logger + listsCache ObjectsListCache + objCache cache.ObjectsCache + headCache cache.HeadObjectsCache + bucketCache cache.BucketCache + systemCache cache.SystemCache } // CacheConfig contains params for caches. @@ -156,8 +159,8 @@ type ( PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error) GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error) - ListBuckets(ctx context.Context) ([]*BucketInfo, error) - GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) + ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) + GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) GetBucketACL(ctx context.Context, name string) (*BucketACL, error) PutBucketACL(ctx context.Context, p *PutBucketACLParams) error CreateBucket(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) @@ -228,26 +231,22 @@ func (v *objectVersions) getLast() *ObjectInfo { return nil } -func (v *objectVersions) getFiltered() []*ObjectVersionInfo { +func (v *objectVersions) getFiltered() []*ObjectInfo { if len(v.objects) == 0 { return nil } v.sort() existedVersions := getExistedVersions(v) - res := make([]*ObjectVersionInfo, 0, len(v.objects)) + res := make([]*ObjectInfo, 0, len(v.objects)) for _, version := range v.objects { delMark := version.Headers[versionsDeleteMarkAttr] if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") { - res = append(res, &ObjectVersionInfo{Object: version}) + res = append(res, version) } } - if len(res) > 0 { - res[len(res)-1].IsLatest = true - } - return res } @@ -279,10 +278,14 @@ func (t *VersionedObject) String() string { // and establishes gRPC connection with node. func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client { return &layer{ - pool: conns, - log: log, - listObjCache: newListObjectsCache(config.ListObjectsLifetime), - objCache: cache.New(config.Size, config.Lifetime), + pool: conns, + log: log, + listsCache: newListObjectsCache(config.ListObjectsLifetime), + objCache: cache.New(config.Size, config.Lifetime), + //todo reconsider cache params + headCache: cache.NewHeadObject(1000, time.Minute), + bucketCache: cache.NewBucketCache(150, time.Minute), + systemCache: cache.NewSystemCache(1000, 5*time.Minute), } } @@ -320,12 +323,16 @@ func (n *layer) Get(ctx context.Context, address *object.Address) (*object.Objec } // GetBucketInfo returns bucket info by name. -func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) { +func (n *layer) GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) { name, err := url.QueryUnescape(name) if err != nil { return nil, err } + if bktInfo := n.bucketCache.Get(name); bktInfo != nil { + return bktInfo, nil + } + containerID := new(cid.ID) if err := containerID.Parse(name); err != nil { list, err := n.containerList(ctx) @@ -374,7 +381,7 @@ func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) err // ListBuckets returns all user containers. Name of the bucket is a container // id. Timestamp is omitted since it is not saved in neofs container. -func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) { +func (n *layer) ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) { return n.containerList(ctx) } @@ -382,21 +389,12 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) { func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { var err error - //if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - // return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err) - //} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil { - // return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err) - //} - - addr := object.NewAddress() - addr.SetObjectID(p.ObjectInfo.ID()) - addr.SetContainerID(p.ObjectInfo.CID()) - params := &getParams{ - Writer: p.Writer, - address: addr, - offset: p.Offset, - length: p.Length, + Writer: p.Writer, + cid: p.ObjectInfo.CID(), + oid: p.ObjectInfo.ID(), + offset: p.Offset, + length: p.Length, } if p.Range != nil { @@ -411,7 +409,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { } if err != nil { - n.objCache.Delete(addr) + n.objCache.Delete(p.ObjectInfo.Address()) return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID(), err) } @@ -433,32 +431,26 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*Object return n.headVersion(ctx, bkt, p.VersionID) } -func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*ObjectInfo, error) { +func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo) (*ObjectInfo, error) { + if meta := n.systemCache.Get(bktVersionSettingsObject); meta != nil { + return objInfoFromMeta(bkt, meta), nil + } + oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bktVersionSettingsObject}) if err != nil { return nil, err } - addr := object.NewAddress() - addr.SetObjectID(oid) - addr.SetContainerID(bkt.CID) - - /* todo: now we get an address via request to NeoFS and try to find the object with the address in cache - but it will be resolved after implementation of local cache with nicenames and address of objects - for get/head requests */ - meta := n.objCache.Get(addr) - if meta == nil { - meta, err = n.objectHead(ctx, bkt.CID, oid) - if err != nil { - n.log.Error("could not fetch object head", zap.Error(err)) - return nil, err - } - if err = n.objCache.Put(addr, *meta); err != nil { - n.log.Error("couldn't cache an object", zap.Error(err)) - } + meta, err := n.objectHead(ctx, bkt.CID, oid) + if err != nil { + n.log.Error("could not fetch object head", zap.Error(err)) + return nil, err + } + if err = n.systemCache.Put(bktVersionSettingsObject, meta); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) } - return objectInfoFromMeta(bkt, meta, "", ""), nil + return objInfoFromMeta(bkt, meta), nil } // PutObject into storage. @@ -496,7 +488,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf } // DeleteObject removes all objects with passed nice name. -func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *VersionedObject) error { +func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) error { var ( err error ids []*object.ID @@ -543,7 +535,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione return nil } -func (n *layer) checkVersionsExists(ctx context.Context, bkt *BucketInfo, obj *VersionedObject) (*object.ID, error) { +func (n *layer) checkVersionsExists(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) { id := object.NewID() if err := id.Parse(obj.VersionID); err != nil { return nil, &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()} @@ -608,60 +600,70 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - res := ListObjectVersionsInfo{} - versions := make(map[string]*objectVersions) + var versions map[string]*objectVersions + res := &ListObjectVersionsInfo{} bkt, err := n.GetBucketInfo(ctx, p.Bucket) if err != nil { return nil, err } - ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID}) + + cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter) if err != nil { return nil, err } - for _, id := range ids { - meta, err := n.objectHead(ctx, bkt.CID, id) + allObjects := n.listsCache.Get(cacheKey) + if allObjects == nil { + versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) if err != nil { - n.log.Warn("could not fetch object meta", zap.Error(err)) - continue + return nil, err } - if oi := objectInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); oi != nil { - if oi.Name <= p.KeyMarker { - continue - } - if isSystem(oi) { - continue - } - if objVersions, ok := versions[oi.Name]; ok { - objVersions.appendVersion(oi) - versions[oi.Name] = objVersions - } else { - objVersion := newObjectVersions(oi.Name) - objVersion.appendVersion(oi) - versions[oi.Name] = objVersion - } + sortedNames := make([]string, 0, len(versions)) + for k := range versions { + sortedNames = append(sortedNames, k) } + sort.Strings(sortedNames) + + allObjects = make([]*ObjectInfo, 0, p.MaxKeys) + for _, name := range sortedNames { + allObjects = append(allObjects, versions[name].getFiltered()...) + } + + // putting to cache a copy of allObjects because allObjects can be modified further + n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) } - sortedNames := make([]string, 0, len(versions)) - for k := range versions { - sortedNames = append(sortedNames, k) - } - sort.Strings(sortedNames) - - objects := make([]*ObjectVersionInfo, 0, p.MaxKeys) - for _, name := range sortedNames { - objects = append(objects, versions[name].getFiltered()...) - if len(objects) > p.MaxKeys { - objects = objects[:p.MaxKeys] + for i, obj := range allObjects { + if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker { + allObjects = allObjects[i:] break } } + res.CommonPrefixes, allObjects = triageObjects(allObjects) + + if len(allObjects) > p.MaxKeys { + res.IsTruncated = true + res.NextKeyMarker = allObjects[p.MaxKeys].Name + res.NextVersionIDMarker = allObjects[p.MaxKeys].Version() + + allObjects = allObjects[:p.MaxKeys] + res.KeyMarker = allObjects[p.MaxKeys-1].Name + res.VersionIDMarker = allObjects[p.MaxKeys-1].Version() + } + + objects := make([]*ObjectVersionInfo, len(allObjects)) + for i, obj := range allObjects { + objects[i] = &ObjectVersionInfo{Object: obj} + if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name { + objects[i].IsLatest = true + } + } + res.Version, res.DeleteMarker = triageVersions(objects) - return &res, nil + return res, nil } func sortVersions(versions []*ObjectInfo) { @@ -773,7 +775,7 @@ func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*Bu return n.getBucketSettings(ctx, bktInfo) } -func (n *layer) getBucketSettings(ctx context.Context, bktInfo *BucketInfo) (*BucketSettings, error) { +func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) { objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo) if err != nil { return nil, err diff --git a/api/layer/object.go b/api/layer/object.go index b418af0..226dec2 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -14,6 +14,8 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors" "go.uber.org/zap" ) @@ -29,9 +31,10 @@ type ( io.Writer *object.Range - offset int64 - length int64 - address *object.Address + offset int64 + length int64 + cid *cid.ID + oid *object.ID } // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. @@ -58,7 +61,7 @@ type ( } allObjectParams struct { - Bucket *BucketInfo + Bucket *cache.BucketInfo Delimiter string Prefix string } @@ -96,12 +99,16 @@ func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, er return nil, errors.New("several objects with the same name found") } -// objectHead returns all object's headers. -func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) { +func newAddress(cid *cid.ID, oid *object.ID) *object.Address { address := object.NewAddress() address.SetContainerID(cid) address.SetObjectID(oid) - ops := new(client.ObjectHeaderParams).WithAddress(address).WithAllFields() + return address +} + +// objectHead returns all object's headers. +func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) { + ops := new(client.ObjectHeaderParams).WithAddress(newAddress(cid, oid)).WithAllFields() return n.pool.GetObjectHeader(ctx, ops, n.BearerOpt(ctx)) } @@ -109,19 +116,19 @@ func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*o func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, error) { // prepare length/offset writer w := newWriter(p.Writer, p.offset, p.length) - ops := new(client.GetObjectParams).WithAddress(p.address).WithPayloadWriter(w) + ops := new(client.GetObjectParams).WithAddress(newAddress(p.cid, p.oid)).WithPayloadWriter(w) return n.pool.GetObject(ctx, ops, n.BearerOpt(ctx)) } // objectRange gets object range and writes it into provided io.Writer. func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) { w := newWriter(p.Writer, p.offset, p.length) - ops := new(client.RangeDataParams).WithAddress(p.address).WithDataWriter(w).WithRange(p.Range) + ops := new(client.RangeDataParams).WithAddress(newAddress(p.cid, p.oid)).WithDataWriter(w).WithRange(p.Range) return n.pool.ObjectPayloadRangeData(ctx, ops, n.BearerOpt(ctx)) } // objectPut into NeoFS, took payload from io.Reader. -func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectParams) (*ObjectInfo, error) { +func (n *layer) objectPut(ctx context.Context, bkt *cache.BucketInfo, p *PutObjectParams) (*ObjectInfo, error) { own := n.Owner(ctx) obj, err := url.QueryUnescape(p.Object) if err != nil { @@ -135,8 +142,15 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara } idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled) + r := p.Reader + if len(p.Header[api.ContentType]) == 0 { + d := newDetector(p.Reader) + if contentType, err := d.Detect(); err == nil { + p.Header[api.ContentType] = contentType + } + r = d.MultiReader() + } rawObject := formRawObject(p, bkt.CID, own, obj) - r := newDetector(p.Reader) ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r) oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx)) @@ -144,11 +158,21 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara return nil, err } + if p.Header[versionsDeleteMarkAttr] == delMarkFullObject { + if last := versions.getLast(); last != nil { + n.objCache.Delete(last.Address()) + } + } + meta, err := n.objectHead(ctx, bkt.CID, oid) if err != nil { return nil, err } + if err = n.objCache.Put(*meta); err != nil { + n.log.Error("couldn't cache an object", zap.Error(err)) + } + for _, id := range idsToDeleteArr { if err = n.objectDelete(ctx, bkt.CID, id); err != nil { n.log.Warn("couldn't delete object", @@ -168,7 +192,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara Created: time.Now(), CreationEpoch: meta.CreationEpoch(), Headers: p.Header, - ContentType: r.contentType, + ContentType: p.Header[api.ContentType], HashSum: meta.PayloadChecksum().String(), }, nil } @@ -229,9 +253,12 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio versionsDeletedStr += "," } - lastVersion := versions.getLast() - p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version() - idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID()) + if lastVersion := versions.getLast(); lastVersion != nil { + p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version() + idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID()) + } else if len(versionsDeletedStr) != 0 { + p.Header[versionsDelAttr] = versionsDeletedStr + } for _, version := range versions.objects { if contains(versions.delList, version.Version()) { @@ -243,7 +270,13 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio return idsToDeleteArr } -func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *BucketInfo, objectName string) (*ObjectInfo, error) { +func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*ObjectInfo, error) { + if address := n.headCache.Get(bkt.Name + "/" + objectName); address != nil { + if headInfo := n.objCache.Get(address); headInfo != nil { + return objInfoFromMeta(bkt, headInfo), nil + } + } + versions, err := n.headVersions(ctx, bkt, objectName) if err != nil { return nil, err @@ -253,10 +286,17 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *BucketInfo if lastVersion == nil { return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) } + + if err = n.headCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil { + n.log.Warn("couldn't put obj address to head cache", + zap.String("obj nice name", lastVersion.NiceName()), + zap.Error(err)) + } + return lastVersion, nil } -func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName string) (*objectVersions, error) { +func (n *layer) headVersions(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*objectVersions, error) { ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName}) if err != nil { return nil, err @@ -276,6 +316,13 @@ func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName st zap.Error(err)) continue } + if err = n.objCache.Put(*meta); err != nil { + n.log.Warn("couldn't put meta to objects cache", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + } + if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil { if isSystem(oi) { continue @@ -287,12 +334,16 @@ func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName st return versions, nil } -func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID string) (*ObjectInfo, error) { +func (n *layer) headVersion(ctx context.Context, bkt *cache.BucketInfo, versionID string) (*ObjectInfo, error) { oid := object.NewID() if err := oid.Parse(versionID); err != nil { return nil, err } + if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil { + return objInfoFromMeta(bkt, headInfo), nil + } + meta, err := n.objectHead(ctx, bkt.CID, oid) if err != nil { if strings.Contains(err.Error(), "not found") { @@ -301,14 +352,22 @@ func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID stri return nil, err } - return objectInfoFromMeta(bkt, meta, "", ""), nil + objInfo := objectInfoFromMeta(bkt, meta, "", "") + if err = n.objCache.Put(*meta); err != nil { + n.log.Warn("couldn't put obj to object cache", + zap.String("bucket name", objInfo.Bucket), + zap.Stringer("bucket cid", objInfo.CID()), + zap.String("object name", objInfo.Name), + zap.Stringer("object id", objInfo.ID()), + zap.Error(err)) + } + + return objInfo, nil } // objectDelete puts tombstone object into neofs. func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error { - address := object.NewAddress() - address.SetContainerID(cid) - address.SetObjectID(oid) + address := newAddress(cid, oid) dop := new(client.DeleteObjectParams) dop.WithAddress(address) n.objCache.Delete(address) @@ -390,35 +449,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis } func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) { - ids, err := n.objectSearch(ctx, &findParams{cid: p.Bucket.CID}) + versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) if err != nil { return nil, err } - versions := make(map[string]*objectVersions, len(ids)/2) - - for _, id := range ids { - meta, err := n.objectHead(ctx, p.Bucket.CID, id) - if err != nil { - n.log.Warn("could not fetch object meta", zap.Error(err)) - continue - } - if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil { - if isSystem(oi) { - continue - } - - if objVersions, ok := versions[oi.Name]; ok { - objVersions.appendVersion(oi) - versions[oi.Name] = objVersions - } else { - objVersion := newObjectVersions(oi.Name) - objVersion.appendVersion(oi) - versions[oi.Name] = objVersion - } - } - } - objects := make([]*ObjectInfo, 0, len(versions)) for _, v := range versions { lastVersion := v.getLast() @@ -434,6 +469,38 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam return objects, nil } +func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *cache.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID}) + if err != nil { + return nil, err + } + + versions := make(map[string]*objectVersions, len(ids)/2) + for _, id := range ids { + meta, err := n.objectHead(ctx, bkt.CID, id) + if err != nil { + n.log.Warn("could not fetch object meta", zap.Error(err)) + continue + } + if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil { + if isSystem(oi) { + continue + } + + if objVersions, ok := versions[oi.Name]; ok { + objVersions.appendVersion(oi) + versions[oi.Name] = objVersions + } else { + objVersion := newObjectVersions(oi.Name) + objVersion.appendVersion(oi) + versions[oi.Name] = objVersion + } + } + } + + return versions, nil +} + func getExistedVersions(versions *objectVersions) []string { var res []string for _, add := range versions.addList { @@ -498,7 +565,7 @@ func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*Obje func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) { var ( err error - bkt *BucketInfo + bkt *cache.BucketInfo cacheKey cacheOptions allObjects []*ObjectInfo ) @@ -507,11 +574,11 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( return nil, err } - if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil { + if cacheKey, err = createKey(ctx, bkt.CID, listObjectsMethod, p.Prefix, p.Delimiter); err != nil { return nil, err } - allObjects = n.listObjCache.Get(cacheKey) + allObjects = n.listsCache.Get(cacheKey) if allObjects == nil { allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{ @@ -524,13 +591,13 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ( } // putting to cache a copy of allObjects because allObjects can be modified further - n.listObjCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) + n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...)) } return allObjects, nil } -func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *BucketInfo) bool { +func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *cache.BucketInfo) bool { settings, err := n.getBucketSettings(ctx, bktInfo) if err != nil { n.log.Warn("couldn't get versioning settings object", zap.Error(err)) diff --git a/api/layer/object_list_cache.go b/api/layer/object_list_cache.go index 6ed174a..e67ac1a 100644 --- a/api/layer/object_list_cache.go +++ b/api/layer/object_list_cache.go @@ -30,6 +30,11 @@ type ( // DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects. const DefaultObjectsListCacheLifetime = time.Second * 60 +const ( + listObjectsMethod = "listObjects" + listVersionsMethod = "listVersions" +) + type ( listObjectsCache struct { cacheLifetime time.Duration @@ -78,7 +83,7 @@ func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) { }) } -func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) { +func createKey(ctx context.Context, cid *cid.ID, method, prefix, delimiter string) (cacheOptions, error) { box, err := GetBoxData(ctx) if err != nil { return cacheOptions{}, err diff --git a/api/layer/util.go b/api/layer/util.go index a915c5d..557141f 100644 --- a/api/layer/util.go +++ b/api/layer/util.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" ) @@ -60,7 +61,7 @@ type ( // ListObjectVersionsInfo stores info and list of objects' versions. ListObjectVersionsInfo struct { - CommonPrefixes []*string + CommonPrefixes []string IsTruncated bool KeyMarker string NextKeyMarker string @@ -84,7 +85,11 @@ func userHeaders(attrs []*object.Attribute) map[string]string { return result } -func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo { +func objInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object) *ObjectInfo { + return objectInfoFromMeta(bkt, meta, "", "") +} + +func objectInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo { var ( isDir bool size int64 @@ -164,6 +169,12 @@ func (o *ObjectInfo) ID() *object.ID { return o.id } // Version returns object version from ObjectInfo. func (o *ObjectInfo) Version() string { return o.id.String() } +// NiceName returns object name for cache. +func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name } + +// Address returns object address. +func (o *ObjectInfo) Address() *object.Address { return newAddress(o.bucketID, o.id) } + // CID returns bucket ID from ObjectInfo. func (o *ObjectInfo) CID() *cid.ID { return o.bucketID } diff --git a/api/layer/util_test.go b/api/layer/util_test.go index d393b14..950b783 100644 --- a/api/layer/util_test.go +++ b/api/layer/util_test.go @@ -9,6 +9,7 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/stretchr/testify/require" ) @@ -19,7 +20,7 @@ var ( defaultTestContentType = http.DetectContentType(defaultTestPayload) ) -func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object { +func newTestObject(oid *object.ID, bkt *cache.BucketInfo, name string) *object.Object { filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) filename.SetValue(name) @@ -43,7 +44,7 @@ func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object return raw.Object() } -func newTestInfo(oid *object.ID, bkt *BucketInfo, name string, isDir bool) *ObjectInfo { +func newTestInfo(oid *object.ID, bkt *cache.BucketInfo, name string, isDir bool) *ObjectInfo { info := &ObjectInfo{ id: oid, Name: name, @@ -71,7 +72,7 @@ func Test_objectInfoFromMeta(t *testing.T) { oid := object.NewID() containerID := cid.New() - bkt := &BucketInfo{ + bkt := &cache.BucketInfo{ Name: "test-container", CID: containerID, Owner: uid, diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 31ec3e0..e607ef5 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -20,6 +20,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-s3-gw/api" + "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" @@ -61,10 +62,7 @@ func (t *testPool) PutObject(ctx context.Context, params *client.PutObjectParams raw.SetPayload(all) } - addr := object.NewAddress() - addr.SetObjectID(raw.ID()) - addr.SetContainerID(raw.ContainerID()) - + addr := newAddress(raw.ContainerID(), raw.ID()) t.objects[addr.String()] = raw.Object() return raw.ID(), nil } @@ -329,8 +327,12 @@ func prepareContext(t *testing.T) *testContext { require.NoError(t, err) return &testContext{ - ctx: ctx, - layer: NewLayer(l, tp), + ctx: ctx, + layer: NewLayer(l, tp, &CacheConfig{ + Size: cache.DefaultObjectsCacheSize, + Lifetime: cache.DefaultObjectsCacheLifetime, + ListObjectsLifetime: DefaultObjectsListCacheLifetime}, + ), bkt: bktName, bktID: bktID, obj: "obj1",