[#122] Add versioning cache

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-08-18 16:48:58 +03:00
parent f6c51cc9ee
commit 11558124cd
15 changed files with 503 additions and 203 deletions

64
api/cache/bucket.go vendored Normal file
View file

@ -0,0 +1,64 @@
package cache
import (
"time"
"github.com/bluele/gcache"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
)
type (
// BucketCache provides interface for lru cache for objects.
BucketCache interface {
Get(key string) *BucketInfo
Put(bkt *BucketInfo) error
Delete(key string) bool
}
// BucketInfo stores basic bucket data.
BucketInfo struct {
Name string
CID *cid.ID
Owner *owner.ID
Created time.Time
}
// GetBucketCache contains cache with objects and lifetime of cache entries.
GetBucketCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewBucketCache creates an object of BucketCache.
func NewBucketCache(cacheSize int, lifetime time.Duration) *GetBucketCache {
gc := gcache.New(cacheSize).LRU().Build()
return &GetBucketCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *GetBucketCache) Get(key string) *BucketInfo {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*BucketInfo)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *GetBucketCache) Put(bkt *BucketInfo) error {
return o.cache.SetWithExpire(bkt.Name, bkt, o.lifetime)
}
// Delete deletes an object from cache.
func (o *GetBucketCache) Delete(key string) bool {
return o.cache.Remove(key)
}

55
api/cache/head_cache.go vendored Normal file
View file

@ -0,0 +1,55 @@
package cache
import (
"time"
"github.com/bluele/gcache"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
)
// HeadObjectsCache provides interface for lru cache for objects.
type HeadObjectsCache interface {
Get(key string) *object.Address
Put(key string, address *object.Address) error
Delete(key string) bool
}
type (
// HeadObjectCache contains cache with objects and lifetime of cache entries.
HeadObjectCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewHeadObject creates an object of ObjectHeadersCache.
func NewHeadObject(cacheSize int, lifetime time.Duration) *HeadObjectCache {
gc := gcache.New(cacheSize).LRU().Build()
return &HeadObjectCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *HeadObjectCache) Get(key string) *object.Address {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*object.Address)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *HeadObjectCache) Put(key string, address *object.Address) error {
return o.cache.SetWithExpire(key, address, o.lifetime)
}
// Delete deletes an object from cache.
func (o *HeadObjectCache) Delete(key string) bool {
return o.cache.Remove(key)
}

View file

@ -10,7 +10,7 @@ import (
// ObjectsCache provides interface for lru cache for objects.
type ObjectsCache interface {
Get(address *object.Address) *object.Object
Put(address *object.Address, obj object.Object) error
Put(obj object.Object) error
Delete(address *object.Address) bool
}
@ -52,8 +52,8 @@ func (o *ObjectHeadersCache) Get(address *object.Address) *object.Object {
}
// Put puts an object to cache.
func (o *ObjectHeadersCache) Put(address *object.Address, obj object.Object) error {
return o.cache.SetWithExpire(address.String(), obj, o.lifetime)
func (o *ObjectHeadersCache) Put(obj object.Object) error {
return o.cache.SetWithExpire(obj.ContainerID().String()+"/"+obj.ID().String(), obj, o.lifetime)
}
// Delete deletes an object from cache.

View file

@ -4,6 +4,8 @@ import (
"testing"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
objecttest "github.com/nspcc-dev/neofs-api-go/pkg/object/test"
"github.com/stretchr/testify/require"
)
@ -14,23 +16,23 @@ const (
)
func TestCache(t *testing.T) {
var (
address = objecttest.Address()
object = objecttest.Object()
)
obj := objecttest.Object()
address := object.NewAddress()
address.SetContainerID(obj.ContainerID())
address.SetObjectID(obj.ID())
t.Run("check get", func(t *testing.T) {
cache := New(cachesize, lifetime)
err := cache.Put(address, *object)
err := cache.Put(*obj)
require.NoError(t, err)
actual := cache.Get(address)
require.Equal(t, object, actual)
require.Equal(t, obj, actual)
})
t.Run("check delete", func(t *testing.T) {
cache := New(cachesize, lifetime)
err := cache.Put(address, *object)
err := cache.Put(*obj)
require.NoError(t, err)
cache.Delete(address)

56
api/cache/system.go vendored Normal file
View file

@ -0,0 +1,56 @@
package cache
import (
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/bluele/gcache"
)
type (
// SystemCache provides interface for lru cache for objects.
SystemCache interface {
Get(key string) *object.Object
Put(key string, obj *object.Object) error
Delete(key string) bool
}
// systemCache contains cache with objects and lifetime of cache entries.
systemCache struct {
cache gcache.Cache
lifetime time.Duration
}
)
// NewSystemCache creates an object of SystemCache.
func NewSystemCache(cacheSize int, lifetime time.Duration) SystemCache {
gc := gcache.New(cacheSize).LRU().Build()
return &systemCache{cache: gc, lifetime: lifetime}
}
// Get returns cached object.
func (o *systemCache) Get(key string) *object.Object {
entry, err := o.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*object.Object)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *systemCache) Put(key string, obj *object.Object) error {
return o.cache.SetWithExpire(key, obj, o.lifetime)
}
// Delete deletes an object from cache.
func (o *systemCache) Delete(key string) bool {
return o.cache.Remove(key)
}

View file

@ -46,6 +46,8 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
h.logAndSendError(w, "could not fetch object info", reqInfo, err)
return
}
if len(inf.ContentType) == 0 {
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{
ObjectInfo: inf,
@ -58,6 +60,8 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
inf.ContentType = http.DetectContentType(buffer.Bytes())
}
writeHeaders(w.Header(), inf)
w.WriteHeader(http.StatusOK)
}

View file

@ -263,7 +263,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
}
for _, prefix := range info.CommonPrefixes {
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: *prefix})
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: prefix})
}
for _, ver := range info.Version {

View file

@ -11,37 +11,29 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-sdk-go/pkg/pool"
"go.uber.org/zap"
)
type (
// BucketInfo stores basic bucket data.
BucketInfo struct {
Name string
CID *cid.ID
Owner *owner.ID
Created time.Time
BasicACL uint32
}
// BucketACL extends BucketInfo by eacl.Table.
BucketACL struct {
Info *BucketInfo
Info *cache.BucketInfo
EACL *eacl.Table
}
)
func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, error) {
func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*cache.BucketInfo, error) {
var (
err error
res *container.Container
rid = api.GetRequestID(ctx)
bearerOpt = n.BearerOpt(ctx)
info = &BucketInfo{
info = &cache.BucketInfo{
CID: cid,
Name: cid.String(),
}
@ -82,10 +74,17 @@ func (n *layer) containerInfo(ctx context.Context, cid *cid.ID) (*BucketInfo, er
}
}
if err := n.bucketCache.Put(info); err != nil {
n.log.Warn("could not put bucket info into cache",
zap.Stringer("cid", cid),
zap.String("bucket_name", info.Name),
zap.Error(err))
}
return info, nil
}
func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) {
func (n *layer) containerList(ctx context.Context) ([]*cache.BucketInfo, error) {
var (
err error
own = n.Owner(ctx)
@ -101,7 +100,7 @@ func (n *layer) containerList(ctx context.Context) ([]*BucketInfo, error) {
return nil, err
}
list := make([]*BucketInfo, 0, len(res))
list := make([]*cache.BucketInfo, 0, len(res))
for _, cid := range res {
info, err := n.containerInfo(ctx, cid)
if err != nil {

View file

@ -3,24 +3,56 @@ package layer
import (
"io"
"net/http"
"sync"
)
type detector struct {
type (
detector struct {
io.Reader
sync.Once
err error
data []byte
}
errReader struct {
data []byte
err error
offset int
}
)
contentType string
const contentTypeDetectSize = 512
func newReader(data []byte, err error) *errReader {
return &errReader{data: data, err: err}
}
func newDetector(r io.Reader) *detector {
return &detector{Reader: r}
func (r *errReader) Read(b []byte) (int, error) {
if r.offset >= len(r.data) {
return 0, io.EOF
}
n := copy(b, r.data[r.offset:])
r.offset += n
if r.offset >= len(r.data) {
return n, r.err
}
return n, nil
}
func (d *detector) Read(data []byte) (int, error) {
d.Do(func() {
d.contentType = http.DetectContentType(data)
})
return d.Reader.Read(data)
func newDetector(reader io.Reader) *detector {
return &detector{
data: make([]byte, contentTypeDetectSize),
Reader: reader,
}
}
func (d *detector) Detect() (string, error) {
n, err := d.Reader.Read(d.data)
if err != nil && err != io.EOF {
d.err = err
return "", err
}
d.data = d.data[:n]
return http.DetectContentType(d.data), nil
}
func (d *detector) MultiReader() io.Reader {
return io.MultiReader(newReader(d.data, d.err), d.Reader)
}

View file

@ -30,8 +30,11 @@ type (
layer struct {
pool pool.Pool
log *zap.Logger
listObjCache ObjectsListCache
listsCache ObjectsListCache
objCache cache.ObjectsCache
headCache cache.HeadObjectsCache
bucketCache cache.BucketCache
systemCache cache.SystemCache
}
// CacheConfig contains params for caches.
@ -156,8 +159,8 @@ type (
PutBucketVersioning(ctx context.Context, p *PutVersioningParams) (*ObjectInfo, error)
GetBucketVersioning(ctx context.Context, name string) (*BucketSettings, error)
ListBuckets(ctx context.Context) ([]*BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error)
GetBucketACL(ctx context.Context, name string) (*BucketACL, error)
PutBucketACL(ctx context.Context, p *PutBucketACLParams) error
CreateBucket(ctx context.Context, p *CreateBucketParams) (*cid.ID, error)
@ -228,26 +231,22 @@ func (v *objectVersions) getLast() *ObjectInfo {
return nil
}
func (v *objectVersions) getFiltered() []*ObjectVersionInfo {
func (v *objectVersions) getFiltered() []*ObjectInfo {
if len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
res := make([]*ObjectVersionInfo, 0, len(v.objects))
res := make([]*ObjectInfo, 0, len(v.objects))
for _, version := range v.objects {
delMark := version.Headers[versionsDeleteMarkAttr]
if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") {
res = append(res, &ObjectVersionInfo{Object: version})
res = append(res, version)
}
}
if len(res) > 0 {
res[len(res)-1].IsLatest = true
}
return res
}
@ -281,8 +280,12 @@ func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client {
return &layer{
pool: conns,
log: log,
listObjCache: newListObjectsCache(config.ListObjectsLifetime),
listsCache: newListObjectsCache(config.ListObjectsLifetime),
objCache: cache.New(config.Size, config.Lifetime),
//todo reconsider cache params
headCache: cache.NewHeadObject(1000, time.Minute),
bucketCache: cache.NewBucketCache(150, time.Minute),
systemCache: cache.NewSystemCache(1000, 5*time.Minute),
}
}
@ -320,12 +323,16 @@ func (n *layer) Get(ctx context.Context, address *object.Address) (*object.Objec
}
// GetBucketInfo returns bucket info by name.
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) {
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*cache.BucketInfo, error) {
name, err := url.QueryUnescape(name)
if err != nil {
return nil, err
}
if bktInfo := n.bucketCache.Get(name); bktInfo != nil {
return bktInfo, nil
}
containerID := new(cid.ID)
if err := containerID.Parse(name); err != nil {
list, err := n.containerList(ctx)
@ -374,7 +381,7 @@ func (n *layer) PutBucketACL(ctx context.Context, param *PutBucketACLParams) err
// ListBuckets returns all user containers. Name of the bucket is a container
// id. Timestamp is omitted since it is not saved in neofs container.
func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) {
func (n *layer) ListBuckets(ctx context.Context) ([]*cache.BucketInfo, error) {
return n.containerList(ctx)
}
@ -382,19 +389,10 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*BucketInfo, error) {
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var err error
//if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
// return fmt.Errorf("couldn't find bucket: %s : %w", p.Bucket, err)
//} else if oid, err = n.objectFindID(ctx, &findParams{cid: bkt.CID, val: p.Object}); err != nil {
// return fmt.Errorf("search of the object failed: cid: %s, val: %s : %w", bkt.CID, p.Object, err)
//}
addr := object.NewAddress()
addr.SetObjectID(p.ObjectInfo.ID())
addr.SetContainerID(p.ObjectInfo.CID())
params := &getParams{
Writer: p.Writer,
address: addr,
cid: p.ObjectInfo.CID(),
oid: p.ObjectInfo.ID(),
offset: p.Offset,
length: p.Length,
}
@ -411,7 +409,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
}
if err != nil {
n.objCache.Delete(addr)
n.objCache.Delete(p.ObjectInfo.Address())
return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID(), err)
}
@ -433,32 +431,26 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*Object
return n.headVersion(ctx, bkt, p.VersionID)
}
func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*ObjectInfo, error) {
func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *cache.BucketInfo) (*ObjectInfo, error) {
if meta := n.systemCache.Get(bktVersionSettingsObject); meta != nil {
return objInfoFromMeta(bkt, meta), nil
}
oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: bktVersionSettingsObject})
if err != nil {
return nil, err
}
addr := object.NewAddress()
addr.SetObjectID(oid)
addr.SetContainerID(bkt.CID)
/* todo: now we get an address via request to NeoFS and try to find the object with the address in cache
but it will be resolved after implementation of local cache with nicenames and address of objects
for get/head requests */
meta := n.objCache.Get(addr)
if meta == nil {
meta, err = n.objectHead(ctx, bkt.CID, oid)
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
n.log.Error("could not fetch object head", zap.Error(err))
return nil, err
}
if err = n.objCache.Put(addr, *meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
if err = n.systemCache.Put(bktVersionSettingsObject, meta); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
return objectInfoFromMeta(bkt, meta, "", ""), nil
return objInfoFromMeta(bkt, meta), nil
}
// PutObject into storage.
@ -496,7 +488,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf
}
// DeleteObject removes all objects with passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *VersionedObject) error {
func (n *layer) deleteObject(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) error {
var (
err error
ids []*object.ID
@ -543,7 +535,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
return nil
}
func (n *layer) checkVersionsExists(ctx context.Context, bkt *BucketInfo, obj *VersionedObject) (*object.ID, error) {
func (n *layer) checkVersionsExists(ctx context.Context, bkt *cache.BucketInfo, obj *VersionedObject) (*object.ID, error) {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return nil, &errors.DeleteError{Err: errors.GetAPIError(errors.ErrInvalidVersion), Object: obj.String()}
@ -608,41 +600,24 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
res := ListObjectVersionsInfo{}
versions := make(map[string]*objectVersions)
var versions map[string]*objectVersions
res := &ListObjectVersionsInfo{}
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil {
return nil, err
}
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID})
cacheKey, err := createKey(ctx, bkt.CID, listVersionsMethod, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
allObjects := n.listsCache.Get(cacheKey)
if allObjects == nil {
versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if oi := objectInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); oi != nil {
if oi.Name <= p.KeyMarker {
continue
}
if isSystem(oi) {
continue
}
if objVersions, ok := versions[oi.Name]; ok {
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
} else {
objVersion := newObjectVersions(oi.Name)
objVersion.appendVersion(oi)
versions[oi.Name] = objVersion
}
}
return nil, err
}
sortedNames := make([]string, 0, len(versions))
@ -651,17 +626,44 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
sort.Strings(sortedNames)
objects := make([]*ObjectVersionInfo, 0, p.MaxKeys)
allObjects = make([]*ObjectInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
objects = append(objects, versions[name].getFiltered()...)
if len(objects) > p.MaxKeys {
objects = objects[:p.MaxKeys]
allObjects = append(allObjects, versions[name].getFiltered()...)
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
for i, obj := range allObjects {
if obj.Name >= p.KeyMarker && obj.Version() >= p.VersionIDMarker {
allObjects = allObjects[i:]
break
}
}
res.CommonPrefixes, allObjects = triageObjects(allObjects)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys].Name
res.NextVersionIDMarker = allObjects[p.MaxKeys].Version()
allObjects = allObjects[:p.MaxKeys]
res.KeyMarker = allObjects[p.MaxKeys-1].Name
res.VersionIDMarker = allObjects[p.MaxKeys-1].Version()
}
objects := make([]*ObjectVersionInfo, len(allObjects))
for i, obj := range allObjects {
objects[i] = &ObjectVersionInfo{Object: obj}
if i == len(allObjects)-1 || allObjects[i+1].Name != obj.Name {
objects[i].IsLatest = true
}
}
res.Version, res.DeleteMarker = triageVersions(objects)
return &res, nil
return res, nil
}
func sortVersions(versions []*ObjectInfo) {
@ -773,7 +775,7 @@ func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*Bu
return n.getBucketSettings(ctx, bktInfo)
}
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *BucketInfo) (*BucketSettings, error) {
func (n *layer) getBucketSettings(ctx context.Context, bktInfo *cache.BucketInfo) (*BucketSettings, error) {
objInfo, err := n.getSettingsObjectInfo(ctx, bktInfo)
if err != nil {
return nil, err

View file

@ -14,6 +14,8 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
apiErrors "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"go.uber.org/zap"
)
@ -31,7 +33,8 @@ type (
offset int64
length int64
address *object.Address
cid *cid.ID
oid *object.ID
}
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
@ -58,7 +61,7 @@ type (
}
allObjectParams struct {
Bucket *BucketInfo
Bucket *cache.BucketInfo
Delimiter string
Prefix string
}
@ -96,12 +99,16 @@ func (n *layer) objectFindID(ctx context.Context, p *findParams) (*object.ID, er
return nil, errors.New("several objects with the same name found")
}
// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
func newAddress(cid *cid.ID, oid *object.ID) *object.Address {
address := object.NewAddress()
address.SetContainerID(cid)
address.SetObjectID(oid)
ops := new(client.ObjectHeaderParams).WithAddress(address).WithAllFields()
return address
}
// objectHead returns all object's headers.
func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*object.Object, error) {
ops := new(client.ObjectHeaderParams).WithAddress(newAddress(cid, oid)).WithAllFields()
return n.pool.GetObjectHeader(ctx, ops, n.BearerOpt(ctx))
}
@ -109,19 +116,19 @@ func (n *layer) objectHead(ctx context.Context, cid *cid.ID, oid *object.ID) (*o
func (n *layer) objectGet(ctx context.Context, p *getParams) (*object.Object, error) {
// prepare length/offset writer
w := newWriter(p.Writer, p.offset, p.length)
ops := new(client.GetObjectParams).WithAddress(p.address).WithPayloadWriter(w)
ops := new(client.GetObjectParams).WithAddress(newAddress(p.cid, p.oid)).WithPayloadWriter(w)
return n.pool.GetObject(ctx, ops, n.BearerOpt(ctx))
}
// objectRange gets object range and writes it into provided io.Writer.
func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) {
w := newWriter(p.Writer, p.offset, p.length)
ops := new(client.RangeDataParams).WithAddress(p.address).WithDataWriter(w).WithRange(p.Range)
ops := new(client.RangeDataParams).WithAddress(newAddress(p.cid, p.oid)).WithDataWriter(w).WithRange(p.Range)
return n.pool.ObjectPayloadRangeData(ctx, ops, n.BearerOpt(ctx))
}
// objectPut into NeoFS, took payload from io.Reader.
func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectParams) (*ObjectInfo, error) {
func (n *layer) objectPut(ctx context.Context, bkt *cache.BucketInfo, p *PutObjectParams) (*ObjectInfo, error) {
own := n.Owner(ctx)
obj, err := url.QueryUnescape(p.Object)
if err != nil {
@ -135,8 +142,15 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara
}
idsToDeleteArr := updateCRDT2PSetHeaders(p, versions, versioningEnabled)
r := p.Reader
if len(p.Header[api.ContentType]) == 0 {
d := newDetector(p.Reader)
if contentType, err := d.Detect(); err == nil {
p.Header[api.ContentType] = contentType
}
r = d.MultiReader()
}
rawObject := formRawObject(p, bkt.CID, own, obj)
r := newDetector(p.Reader)
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r)
oid, err := n.pool.PutObject(ctx, ops, n.BearerOpt(ctx))
@ -144,11 +158,21 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara
return nil, err
}
if p.Header[versionsDeleteMarkAttr] == delMarkFullObject {
if last := versions.getLast(); last != nil {
n.objCache.Delete(last.Address())
}
}
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
return nil, err
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
for _, id := range idsToDeleteArr {
if err = n.objectDelete(ctx, bkt.CID, id); err != nil {
n.log.Warn("couldn't delete object",
@ -168,7 +192,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectPara
Created: time.Now(),
CreationEpoch: meta.CreationEpoch(),
Headers: p.Header,
ContentType: r.contentType,
ContentType: p.Header[api.ContentType],
HashSum: meta.PayloadChecksum().String(),
}, nil
}
@ -229,9 +253,12 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio
versionsDeletedStr += ","
}
lastVersion := versions.getLast()
if lastVersion := versions.getLast(); lastVersion != nil {
p.Header[versionsDelAttr] = versionsDeletedStr + lastVersion.Version()
idsToDeleteArr = append(idsToDeleteArr, lastVersion.ID())
} else if len(versionsDeletedStr) != 0 {
p.Header[versionsDelAttr] = versionsDeletedStr
}
for _, version := range versions.objects {
if contains(versions.delList, version.Version()) {
@ -243,7 +270,13 @@ func updateCRDT2PSetHeaders(p *PutObjectParams, versions *objectVersions, versio
return idsToDeleteArr
}
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *BucketInfo, objectName string) (*ObjectInfo, error) {
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*ObjectInfo, error) {
if address := n.headCache.Get(bkt.Name + "/" + objectName); address != nil {
if headInfo := n.objCache.Get(address); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
}
versions, err := n.headVersions(ctx, bkt, objectName)
if err != nil {
return nil, err
@ -253,10 +286,17 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *BucketInfo
if lastVersion == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
if err = n.headCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil {
n.log.Warn("couldn't put obj address to head cache",
zap.String("obj nice name", lastVersion.NiceName()),
zap.Error(err))
}
return lastVersion, nil
}
func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName string) (*objectVersions, error) {
func (n *layer) headVersions(ctx context.Context, bkt *cache.BucketInfo, objectName string) (*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName})
if err != nil {
return nil, err
@ -276,6 +316,13 @@ func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName st
zap.Error(err))
continue
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put meta to objects cache",
zap.Stringer("object id", id),
zap.Stringer("bucket id", bkt.CID),
zap.Error(err))
}
if oi := objectInfoFromMeta(bkt, meta, "", ""); oi != nil {
if isSystem(oi) {
continue
@ -287,12 +334,16 @@ func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName st
return versions, nil
}
func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID string) (*ObjectInfo, error) {
func (n *layer) headVersion(ctx context.Context, bkt *cache.BucketInfo, versionID string) (*ObjectInfo, error) {
oid := object.NewID()
if err := oid.Parse(versionID); err != nil {
return nil, err
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, oid)); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
meta, err := n.objectHead(ctx, bkt.CID, oid)
if err != nil {
if strings.Contains(err.Error(), "not found") {
@ -301,14 +352,22 @@ func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID stri
return nil, err
}
return objectInfoFromMeta(bkt, meta, "", ""), nil
objInfo := objectInfoFromMeta(bkt, meta, "", "")
if err = n.objCache.Put(*meta); err != nil {
n.log.Warn("couldn't put obj to object cache",
zap.String("bucket name", objInfo.Bucket),
zap.Stringer("bucket cid", objInfo.CID()),
zap.String("object name", objInfo.Name),
zap.Stringer("object id", objInfo.ID()),
zap.Error(err))
}
return objInfo, nil
}
// objectDelete puts tombstone object into neofs.
func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error {
address := object.NewAddress()
address.SetContainerID(cid)
address.SetObjectID(oid)
address := newAddress(cid, oid)
dop := new(client.DeleteObjectParams)
dop.WithAddress(address)
n.objCache.Delete(address)
@ -390,35 +449,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
}
func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: p.Bucket.CID})
versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
versions := make(map[string]*objectVersions, len(ids)/2)
for _, id := range ids {
meta, err := n.objectHead(ctx, p.Bucket.CID, id)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil {
if isSystem(oi) {
continue
}
if objVersions, ok := versions[oi.Name]; ok {
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
} else {
objVersion := newObjectVersions(oi.Name)
objVersion.appendVersion(oi)
versions[oi.Name] = objVersion
}
}
}
objects := make([]*ObjectInfo, 0, len(versions))
for _, v := range versions {
lastVersion := v.getLast()
@ -434,6 +469,38 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam
return objects, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *cache.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID})
if err != nil {
return nil, err
}
versions := make(map[string]*objectVersions, len(ids)/2)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue
}
if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil {
if isSystem(oi) {
continue
}
if objVersions, ok := versions[oi.Name]; ok {
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
} else {
objVersion := newObjectVersions(oi.Name)
objVersion.appendVersion(oi)
versions[oi.Name] = objVersion
}
}
}
return versions, nil
}
func getExistedVersions(versions *objectVersions) []string {
var res []string
for _, add := range versions.addList {
@ -498,7 +565,7 @@ func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*Obje
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) {
var (
err error
bkt *BucketInfo
bkt *cache.BucketInfo
cacheKey cacheOptions
allObjects []*ObjectInfo
)
@ -507,11 +574,11 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
return nil, err
}
if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil {
if cacheKey, err = createKey(ctx, bkt.CID, listObjectsMethod, p.Prefix, p.Delimiter); err != nil {
return nil, err
}
allObjects = n.listObjCache.Get(cacheKey)
allObjects = n.listsCache.Get(cacheKey)
if allObjects == nil {
allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{
@ -524,13 +591,13 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listObjCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
n.listsCache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
return allObjects, nil
}
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *BucketInfo) bool {
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *cache.BucketInfo) bool {
settings, err := n.getBucketSettings(ctx, bktInfo)
if err != nil {
n.log.Warn("couldn't get versioning settings object", zap.Error(err))

View file

@ -30,6 +30,11 @@ type (
// DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects.
const DefaultObjectsListCacheLifetime = time.Second * 60
const (
listObjectsMethod = "listObjects"
listVersionsMethod = "listVersions"
)
type (
listObjectsCache struct {
cacheLifetime time.Duration
@ -78,7 +83,7 @@ func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) {
})
}
func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) {
func createKey(ctx context.Context, cid *cid.ID, method, prefix, delimiter string) (cacheOptions, error) {
box, err := GetBoxData(ctx)
if err != nil {
return cacheOptions{}, err

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
)
@ -60,7 +61,7 @@ type (
// ListObjectVersionsInfo stores info and list of objects' versions.
ListObjectVersionsInfo struct {
CommonPrefixes []*string
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
@ -84,7 +85,11 @@ func userHeaders(attrs []*object.Attribute) map[string]string {
return result
}
func objectInfoFromMeta(bkt *BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo {
func objInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object) *ObjectInfo {
return objectInfoFromMeta(bkt, meta, "", "")
}
func objectInfoFromMeta(bkt *cache.BucketInfo, meta *object.Object, prefix, delimiter string) *ObjectInfo {
var (
isDir bool
size int64
@ -164,6 +169,12 @@ func (o *ObjectInfo) ID() *object.ID { return o.id }
// Version returns object version from ObjectInfo.
func (o *ObjectInfo) Version() string { return o.id.String() }
// NiceName returns object name for cache.
func (o *ObjectInfo) NiceName() string { return o.Bucket + "/" + o.Name }
// Address returns object address.
func (o *ObjectInfo) Address() *object.Address { return newAddress(o.bucketID, o.id) }
// CID returns bucket ID from ObjectInfo.
func (o *ObjectInfo) CID() *cid.ID { return o.bucketID }

View file

@ -9,6 +9,7 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/stretchr/testify/require"
)
@ -19,7 +20,7 @@ var (
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)
func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object {
func newTestObject(oid *object.ID, bkt *cache.BucketInfo, name string) *object.Object {
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(name)
@ -43,7 +44,7 @@ func newTestObject(oid *object.ID, bkt *BucketInfo, name string) *object.Object
return raw.Object()
}
func newTestInfo(oid *object.ID, bkt *BucketInfo, name string, isDir bool) *ObjectInfo {
func newTestInfo(oid *object.ID, bkt *cache.BucketInfo, name string, isDir bool) *ObjectInfo {
info := &ObjectInfo{
id: oid,
Name: name,
@ -71,7 +72,7 @@ func Test_objectInfoFromMeta(t *testing.T) {
oid := object.NewID()
containerID := cid.New()
bkt := &BucketInfo{
bkt := &cache.BucketInfo{
Name: "test-container",
CID: containerID,
Owner: uid,

View file

@ -20,6 +20,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/session"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"github.com/nspcc-dev/neofs-sdk-go/pkg/logger"
"github.com/nspcc-dev/neofs-sdk-go/pkg/pool"
@ -61,10 +62,7 @@ func (t *testPool) PutObject(ctx context.Context, params *client.PutObjectParams
raw.SetPayload(all)
}
addr := object.NewAddress()
addr.SetObjectID(raw.ID())
addr.SetContainerID(raw.ContainerID())
addr := newAddress(raw.ContainerID(), raw.ID())
t.objects[addr.String()] = raw.Object()
return raw.ID(), nil
}
@ -330,7 +328,11 @@ func prepareContext(t *testing.T) *testContext {
return &testContext{
ctx: ctx,
layer: NewLayer(l, tp),
layer: NewLayer(l, tp, &CacheConfig{
Size: cache.DefaultObjectsCacheSize,
Lifetime: cache.DefaultObjectsCacheLifetime,
ListObjectsLifetime: DefaultObjectsListCacheLifetime},
),
bkt: bktName,
bktID: bktID,
obj: "obj1",