forked from TrueCloudLab/frostfs-s3-gw
[#577] Replace ObjInfo with ExtObjInfo in cache
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
dc77ad4863
commit
245e64900d
2 changed files with 47 additions and 38 deletions
8
api/cache/objects.go
vendored
8
api/cache/objects.go
vendored
|
@ -39,13 +39,13 @@ func New(config *Config) *ObjectsCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject returns a cached object info.
|
// GetObject returns a cached object info.
|
||||||
func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
|
func (o *ObjectsCache) GetObject(address oid.Address) *data.ExtendedObjectInfo {
|
||||||
entry, err := o.cache.Get(address)
|
entry, err := o.cache.Get(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result, ok := entry.(*data.ObjectInfo)
|
result, ok := entry.(*data.ExtendedObjectInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
o.logger.Warn("invalid cache entry type", zap.String("actual", fmt.Sprintf("%T", entry)),
|
o.logger.Warn("invalid cache entry type", zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||||
|
@ -56,8 +56,8 @@ func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject puts an object info to cache.
|
// PutObject puts an object info to cache.
|
||||||
func (o *ObjectsCache) PutObject(obj *data.ObjectInfo) error {
|
func (o *ObjectsCache) PutObject(obj *data.ExtendedObjectInfo) error {
|
||||||
return o.cache.Set(obj.Address(), obj)
|
return o.cache.Set(obj.ObjectInfo.Address(), obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes an object from cache.
|
// Delete deletes an object from cache.
|
||||||
|
|
|
@ -230,7 +230,13 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
||||||
HashSum: newVersion.ETag,
|
HashSum: newVersion.ETag,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = n.objCache.PutObject(objInfo); err != nil {
|
extendedObjInfo := &data.ExtendedObjectInfo{
|
||||||
|
ObjectInfo: objInfo,
|
||||||
|
NodeVersion: newVersion,
|
||||||
|
IsLatest: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.objCache.PutObject(extendedObjInfo); err != nil {
|
||||||
n.log.Warn("couldn't add object to cache", zap.Error(err),
|
n.log.Warn("couldn't add object to cache", zap.Error(err),
|
||||||
zap.String("object_name", p.Object), zap.String("bucket_name", p.BktInfo.Name),
|
zap.String("object_name", p.Object), zap.String("bucket_name", p.BktInfo.Name),
|
||||||
zap.String("cid", objInfo.CID.EncodeToString()), zap.String("oid", objInfo.ID.EncodeToString()))
|
zap.String("cid", objInfo.CID.EncodeToString()), zap.String("oid", objInfo.ID.EncodeToString()))
|
||||||
|
@ -246,8 +252,8 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
||||||
|
|
||||||
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) {
|
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) {
|
||||||
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
|
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
|
||||||
if objInfo := n.objCache.GetObject(*addr); objInfo != nil {
|
if extObjInfo := n.objCache.GetObject(*addr); extObjInfo != nil {
|
||||||
return &data.ExtendedObjectInfo{ObjectInfo: objInfo}, nil
|
return extObjInfo, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +274,14 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
if err = n.objCache.PutObject(objInfo); err != nil {
|
|
||||||
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
|
ObjectInfo: objInfo,
|
||||||
|
NodeVersion: node,
|
||||||
|
IsLatest: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.objCache.PutObject(extObjInfo); err != nil {
|
||||||
n.log.Warn("couldn't put object info to cache",
|
n.log.Warn("couldn't put object info to cache",
|
||||||
zap.Stringer("object id", node.OID),
|
zap.Stringer("object id", node.OID),
|
||||||
zap.Stringer("bucket id", bkt.CID),
|
zap.Stringer("bucket id", bkt.CID),
|
||||||
|
@ -280,11 +293,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &data.ExtendedObjectInfo{
|
return extObjInfo, nil
|
||||||
ObjectInfo: objInfo,
|
|
||||||
NodeVersion: node,
|
|
||||||
IsLatest: true,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
|
@ -315,11 +324,8 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if objInfo := n.objCache.GetObject(newAddress(bkt.CID, foundVersion.OID)); objInfo != nil {
|
if extObjInfo := n.objCache.GetObject(newAddress(bkt.CID, foundVersion.OID)); extObjInfo != nil {
|
||||||
return &data.ExtendedObjectInfo{
|
return extObjInfo, nil
|
||||||
ObjectInfo: objInfo,
|
|
||||||
NodeVersion: foundVersion,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
|
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
|
||||||
|
@ -329,9 +335,14 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
if err = n.objCache.PutObject(objInfo); err != nil {
|
|
||||||
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
|
ObjectInfo: objInfo,
|
||||||
|
NodeVersion: foundVersion,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = n.objCache.PutObject(extObjInfo); err != nil {
|
||||||
n.log.Warn("couldn't put obj to object cache",
|
n.log.Warn("couldn't put obj to object cache",
|
||||||
zap.String("bucket name", objInfo.Bucket),
|
zap.String("bucket name", objInfo.Bucket),
|
||||||
zap.Stringer("bucket cid", objInfo.CID),
|
zap.Stringer("bucket cid", objInfo.CID),
|
||||||
|
@ -340,10 +351,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &data.ExtendedObjectInfo{
|
return extObjInfo, nil
|
||||||
ObjectInfo: objInfo,
|
|
||||||
NodeVersion: foundVersion,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// objectDelete puts tombstone object into neofs.
|
// objectDelete puts tombstone object into neofs.
|
||||||
|
@ -539,7 +547,7 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
err = pool.Submit(func() {
|
err = pool.Submit(func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID, p.Prefix, p.Delimiter); oi != nil {
|
if oi := n.objectInfoFromObjectsCacheOrNeoFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi != nil {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case objCh <- oi:
|
case objCh <- oi:
|
||||||
|
@ -597,7 +605,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo,
|
||||||
oi.Created = nodeVersion.DeleteMarker.Created
|
oi.Created = nodeVersion.DeleteMarker.Created
|
||||||
oi.IsDeleteMarker = true
|
oi.IsDeleteMarker = true
|
||||||
} else {
|
} else {
|
||||||
if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion.OID, prefix, delimiter); oi == nil {
|
if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -678,21 +686,22 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) (oi *data.ObjectInfo) {
|
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) {
|
||||||
oi = n.objCache.GetObject(newAddress(bktInfo.CID, obj))
|
extObjInfo := n.objCache.GetObject(newAddress(bktInfo.CID, node.OID))
|
||||||
|
if extObjInfo != nil {
|
||||||
|
return extObjInfo.ObjectInfo
|
||||||
|
}
|
||||||
|
|
||||||
if oi == nil {
|
meta, err := n.objectHead(ctx, bktInfo, node.OID)
|
||||||
meta, err := n.objectHead(ctx, bktInfo, obj)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
oi = objectInfoFromMeta(bktInfo, meta)
|
oi = objectInfoFromMeta(bktInfo, meta)
|
||||||
if err = n.objCache.PutObject(oi); err != nil {
|
if err = n.objCache.PutObject(&data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}); err != nil {
|
||||||
n.log.Warn("couldn't cache an object", zap.Error(err))
|
n.log.Warn("couldn't cache an object", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return processObjectInfoName(oi, prefix, delimiter)
|
return processObjectInfoName(oi, prefix, delimiter)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue