[#420] Using tree service to list object versions

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-20 18:02:00 +03:00 committed by Alex Vanin
parent 55c38e73e6
commit 9c74cca9af
11 changed files with 374 additions and 135 deletions

View file

@ -171,7 +171,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
own := n.Owner(ctx)
versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo)
newVersion := &NodeVersion{IsUnversioned: !versioningEnabled}
newVersion := &data.NodeVersion{IsUnversioned: !versioningEnabled}
r := p.Reader
if r != nil {
@ -283,7 +283,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, err
}
if node.IsDeleteMarker {
if node.DeleteMarker != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
@ -365,7 +365,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return nil, fmt.Errorf("couldn't get versions: %w", err)
}
var foundVersion *NodeVersion
var foundVersion *data.NodeVersion
for _, version := range versions {
if version.OID.EncodeToString() == p.VersionID {
foundVersion = version
@ -554,41 +554,56 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketIn
return objects, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) {
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
var err error
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, false)
ids := n.listsCache.Get(cacheKey)
nodeVersions := n.listsCache.GetVersions(cacheKey)
if ids == nil {
ids, err = n.objectSearch(ctx, &findParams{bkt: bkt, prefix: prefix})
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, &bkt.CID, prefix)
if err != nil {
return nil, err
}
if err = n.listsCache.Put(cacheKey, ids); err != nil {
if err = n.listsCache.PutVersions(cacheKey, nodeVersions); err != nil {
n.log.Error("couldn't cache list of objects", zap.Error(err))
}
}
versions := make(map[string]*objectVersions, len(ids)/2)
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
for i := 0; i < len(ids); i++ {
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i])
if obj == nil {
continue
}
if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil {
if isSystem(oi) {
for _, nodeVersion := range nodeVersions {
oi := &data.ObjectInfo{}
if nodeVersion.DeleteMarker != nil { // delete marker does not match any object in NeoFS
oi.ID = nodeVersion.OID
oi.Name = nodeVersion.DeleteMarker.FilePath
oi.Owner = nodeVersion.DeleteMarker.Owner
oi.Created = nodeVersion.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion.OID)
if obj == nil {
continue
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = newObjectVersions(oi.Name)
oi = objectInfoFromMeta(bkt, obj, prefix, delimiter)
if oi == nil {
continue
}
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
}
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: nodeVersion,
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = []*data.ExtendedObjectInfo{eoi}
} else if !oi.IsDir {
objVersions = append(objVersions, eoi)
}
versions[oi.Name] = objVersions
}
return versions, nil