forked from TrueCloudLab/frostfs-s3-gw
[#547] Cache ObjectInfo instead of Object
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
127b91a97f
commit
42a761c1f2
3 changed files with 37 additions and 76 deletions
38
api/cache/objects.go
vendored
38
api/cache/objects.go
vendored
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/bluele/gcache"
|
"github.com/bluele/gcache"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -39,23 +38,6 @@ func New(config *Config) *ObjectsCache {
|
||||||
return &ObjectsCache{cache: gc, logger: config.Logger}
|
return &ObjectsCache{cache: gc, logger: config.Logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a cached object.
|
|
||||||
func (o *ObjectsCache) Get(address oid.Address) *object.Object {
|
|
||||||
entry, err := o.cache.Get(address.EncodeToString())
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
result, ok := entry.(object.Object)
|
|
||||||
if !ok {
|
|
||||||
o.logger.Warn("invalid cache entry type", zap.String("actual", fmt.Sprintf("%T", entry)),
|
|
||||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return &result
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetObject returns a cached object info.
|
// GetObject returns a cached object info.
|
||||||
func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
|
func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
|
||||||
entry, err := o.cache.Get(address.EncodeToString())
|
entry, err := o.cache.Get(address.EncodeToString())
|
||||||
|
@ -65,30 +47,14 @@ func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
|
||||||
|
|
||||||
result, ok := entry.(*data.ObjectInfo)
|
result, ok := entry.(*data.ObjectInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
o.logger.Warn("invalid cache entry type", zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
|
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put puts an object to cache.
|
|
||||||
func (o *ObjectsCache) Put(obj object.Object) error {
|
|
||||||
cnrID, ok := obj.ContainerID()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("empty container id")
|
|
||||||
}
|
|
||||||
objID, ok := obj.ID()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("empty object id")
|
|
||||||
}
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(cnrID)
|
|
||||||
addr.SetObject(objID)
|
|
||||||
|
|
||||||
return o.cache.Set(addr.EncodeToString(), obj)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutObject puts an object info to cache.
|
// PutObject puts an object info to cache.
|
||||||
func (o *ObjectsCache) PutObject(obj *data.ObjectInfo) error {
|
func (o *ObjectsCache) PutObject(obj *data.ObjectInfo) error {
|
||||||
cnrID := obj.CID.EncodeToString()
|
cnrID := obj.CID.EncodeToString()
|
||||||
|
|
|
@ -228,10 +228,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
HashSum: partInfo.ETag,
|
HashSum: partInfo.ETag,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = n.objCache.PutObject(objInfo); err != nil {
|
|
||||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return objInfo, nil
|
return objInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,6 +331,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
return nil, nil, errors.GetAPIError(errors.ErrInvalidPart)
|
return nil, nil, errors.GetAPIError(errors.ErrInvalidPart)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var multipartObjetSize int64
|
||||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
||||||
|
|
||||||
var completedPartsHeader strings.Builder
|
var completedPartsHeader strings.Builder
|
||||||
|
@ -348,6 +345,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
return nil, nil, errors.GetAPIError(errors.ErrEntityTooSmall)
|
return nil, nil, errors.GetAPIError(errors.ErrEntityTooSmall)
|
||||||
}
|
}
|
||||||
parts = append(parts, partInfo)
|
parts = append(parts, partInfo)
|
||||||
|
multipartObjetSize += partInfo.Size
|
||||||
|
|
||||||
partInfoStr := partInfo.ToHeaderString()
|
partInfoStr := partInfo.ToHeaderString()
|
||||||
if i != len(p.Parts)-1 {
|
if i != len(p.Parts)-1 {
|
||||||
|
@ -388,6 +386,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
Object: p.Info.Key,
|
Object: p.Info.Key,
|
||||||
Reader: r,
|
Reader: r,
|
||||||
Header: initMetadata,
|
Header: initMetadata,
|
||||||
|
Size: multipartObjetSize,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Error("could not put a completed object (multipart upload)",
|
n.log.Error("could not put a completed object (multipart upload)",
|
||||||
|
|
|
@ -238,8 +238,8 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
||||||
|
|
||||||
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ObjectInfo, error) {
|
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ObjectInfo, error) {
|
||||||
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
|
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
|
||||||
if headInfo := n.objCache.Get(*addr); headInfo != nil {
|
if objInfo := n.objCache.GetObject(*addr); objInfo != nil {
|
||||||
return objInfoFromMeta(bkt, headInfo), nil
|
return objInfo, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,14 +259,13 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = n.objCache.Put(*meta); err != nil {
|
objInfo := objInfoFromMeta(bkt, meta)
|
||||||
n.log.Warn("couldn't put meta to objects cache",
|
if err = n.objCache.PutObject(objInfo); err != nil {
|
||||||
|
n.log.Warn("couldn't put object info to cache",
|
||||||
zap.Stringer("object id", node.OID),
|
zap.Stringer("object id", node.OID),
|
||||||
zap.Stringer("bucket id", bkt.CID),
|
zap.Stringer("bucket id", bkt.CID),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo := objInfoFromMeta(bkt, meta)
|
|
||||||
if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil {
|
if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil {
|
||||||
n.log.Warn("couldn't put obj address to head cache",
|
n.log.Warn("couldn't put obj address to head cache",
|
||||||
zap.String("obj nice name", objInfo.NiceName()),
|
zap.String("obj nice name", objInfo.NiceName()),
|
||||||
|
@ -304,8 +303,8 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if headInfo := n.objCache.Get(newAddress(bkt.CID, foundVersion.OID)); headInfo != nil {
|
if objInfo := n.objCache.GetObject(newAddress(bkt.CID, foundVersion.OID)); objInfo != nil {
|
||||||
return objInfoFromMeta(bkt, headInfo), nil
|
return objInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
|
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
|
||||||
|
@ -317,7 +316,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo := objInfoFromMeta(bkt, meta)
|
objInfo := objInfoFromMeta(bkt, meta)
|
||||||
if err = n.objCache.Put(*meta); err != nil {
|
if err = n.objCache.PutObject(objInfo); err != nil {
|
||||||
n.log.Warn("couldn't put obj to object cache",
|
n.log.Warn("couldn't put obj to object cache",
|
||||||
zap.String("bucket name", objInfo.Bucket),
|
zap.String("bucket name", objInfo.Bucket),
|
||||||
zap.Stringer("bucket cid", objInfo.CID),
|
zap.Stringer("bucket cid", objInfo.CID),
|
||||||
|
@ -436,6 +435,10 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(nodeVersions) == 0 {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
sort.Slice(nodeVersions, func(i, j int) bool {
|
sort.Slice(nodeVersions, func(i, j int) bool {
|
||||||
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
|
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
|
||||||
})
|
})
|
||||||
|
@ -511,14 +514,12 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
err = pool.Submit(func() {
|
err = pool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil {
|
if oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID, p.Prefix, p.Delimiter); oi != nil {
|
||||||
if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil {
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case objCh <- oi:
|
case objCh <- oi:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
@ -562,12 +563,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo,
|
||||||
oi.Created = nodeVersion.DeleteMarker.Created
|
oi.Created = nodeVersion.DeleteMarker.Created
|
||||||
oi.IsDeleteMarker = true
|
oi.IsDeleteMarker = true
|
||||||
} else {
|
} else {
|
||||||
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion.OID)
|
if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion.OID, prefix, delimiter); oi == nil {
|
||||||
if obj == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
oi = objectInfoFromMeta(bkt, obj, prefix, delimiter)
|
|
||||||
if oi == nil {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -657,23 +653,23 @@ func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInf
|
||||||
return settings.VersioningEnabled
|
return settings.VersioningEnabled
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID) *object.Object {
|
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) *data.ObjectInfo {
|
||||||
var (
|
if objInfo := n.objCache.GetObject(newAddress(bktInfo.CID, obj)); objInfo != nil {
|
||||||
err error
|
return objInfo
|
||||||
meta = n.objCache.Get(newAddress(bktInfo.CID, obj))
|
}
|
||||||
)
|
|
||||||
if meta == nil {
|
meta, err := n.objectHead(ctx, bktInfo, obj)
|
||||||
meta, err = n.objectHead(ctx, bktInfo, obj)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err = n.objCache.Put(*meta); err != nil {
|
|
||||||
n.log.Error("couldn't cache an object", zap.Error(err))
|
objInfo := objectInfoFromMeta(bktInfo, meta, prefix, delimiter)
|
||||||
}
|
if err = n.objCache.PutObject(objInfo); err != nil {
|
||||||
|
n.log.Warn("couldn't cache an object", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return meta
|
return objInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) transformNeofsError(ctx context.Context, err error) error {
|
func (n *layer) transformNeofsError(ctx context.Context, err error) error {
|
||||||
|
|
Loading…
Reference in a new issue