From 3e20f736a6196e986062f2aac29660a7b85f9a22 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 17 Jan 2024 15:14:26 +0300 Subject: [PATCH] [#165] Move listing function to one file Signed-off-by: Denis Kirillov --- api/layer/listing.go | 859 ++++++++++++++++++++++++++++++++++++++++ api/layer/object.go | 727 ---------------------------------- api/layer/util.go | 33 -- api/layer/versioning.go | 109 ----- 4 files changed, 859 insertions(+), 869 deletions(-) create mode 100644 api/layer/listing.go delete mode 100644 api/layer/versioning.go diff --git a/api/layer/listing.go b/api/layer/listing.go new file mode 100644 index 00000000..7bde1ed0 --- /dev/null +++ b/api/layer/listing.go @@ -0,0 +1,859 @@ +package layer + +import ( + "context" + "errors" + "fmt" + "io" + "sort" + "strings" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" +) + +type ( + // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. + ListObjectsParamsCommon struct { + BktInfo *data.BucketInfo + Delimiter string + Encode string + MaxKeys int + Prefix string + } + + // ListObjectsParamsV1 contains params for ListObjectsV1. + ListObjectsParamsV1 struct { + ListObjectsParamsCommon + Marker string + } + + // ListObjectsParamsV2 contains params for ListObjectsV2. + ListObjectsParamsV2 struct { + ListObjectsParamsCommon + ContinuationToken string + StartAfter string + FetchOwner bool + } + + // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. + ListObjectsInfo struct { + Prefixes []string + Objects []*data.ObjectInfo + IsTruncated bool + } + + // ListObjectsInfoV1 holds data which ListObjectsV1 returns. + ListObjectsInfoV1 struct { + ListObjectsInfo + NextMarker string + } + + // ListObjectsInfoV2 holds data which ListObjectsV2 returns. + ListObjectsInfoV2 struct { + ListObjectsInfo + NextContinuationToken string + } + + // ListObjectVersionsInfo stores info and list of objects versions. + ListObjectVersionsInfo struct { + CommonPrefixes []string + IsTruncated bool + KeyMarker string + NextKeyMarker string + NextVersionIDMarker string + Version []*data.ExtendedObjectInfo + DeleteMarker []*data.ExtendedObjectInfo + VersionIDMarker string + } + + allObjectParams struct { + Bucket *data.BucketInfo + Delimiter string + Prefix string + MaxKeys int + Marker string + ContinuationToken string + } +) + +// ListObjectsV1 returns objects in a bucket for requests of Version 1. +func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { + var result ListObjectsInfoV1 + + prm := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.Marker, + } + + objects, next, err := n.getLatestObjectsVersions(ctx, prm) + if err != nil { + return nil, err + } + + if next != nil { + result.IsTruncated = true + result.NextMarker = objects[len(objects)-1].Name + } + + result.Prefixes, result.Objects = triageObjects(objects) + + return &result, nil +} + +// ListObjectsV2 returns objects in a bucket for requests of Version 2. +func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { + var result ListObjectsInfoV2 + + prm := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + MaxKeys: p.MaxKeys, + Marker: p.StartAfter, + ContinuationToken: p.ContinuationToken, + } + + objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm) + if err != nil { + return nil, err + } + + if next != nil { + result.IsTruncated = true + result.NextContinuationToken = next.ID.EncodeToString() + } + + result.Prefixes, result.Objects = triageObjects(objects) + + return &result, nil +} + +func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { + versions, err := n.getAllObjectsVersions(ctx, p) + if err != nil { + return nil, err + } + + sortedNames := make([]string, 0, len(versions)) + for k := range versions { + sortedNames = append(sortedNames, k) + } + sort.Strings(sortedNames) + + allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys) + + for _, name := range sortedNames { + sortedVersions := versions[name] + sort.Slice(sortedVersions, func(i, j int) bool { + return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order + }) + + for i, version := range sortedVersions { + version.IsLatest = i == 0 + allObjects = append(allObjects, version) + } + } + + if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { + return nil, err + } + + res := &ListObjectVersionsInfo{ + KeyMarker: p.KeyMarker, + VersionIDMarker: p.VersionIDMarker, + } + + res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects) + + if len(allObjects) > p.MaxKeys { + res.IsTruncated = true + res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name + res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID() + + allObjects = allObjects[:p.MaxKeys] + } + + res.Version, res.DeleteMarker = triageVersions(allObjects) + return res, nil +} + +func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { + if p.MaxKeys == 0 { + return nil, nil, nil + } + + owner := n.BearerOwner(ctx) + cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true) + nodeVersions := n.cache.GetList(owner, cacheKey) + + if nodeVersions == nil { + nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) + if err != nil { + return nil, nil, err + } + n.cache.PutList(owner, cacheKey, nodeVersions) + } + + if len(nodeVersions) == 0 { + return nil, nil, nil + } + + sort.Slice(nodeVersions, func(i, j int) bool { + return nodeVersions[i].FilePath < nodeVersions[j].FilePath + }) + + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) + if err != nil { + return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) + } + + objects = make([]*data.ObjectInfo, 0, p.MaxKeys) + + for obj := range objOutCh { + objects = append(objects, obj) + } + + //for node := range nodesGenerator(poolCtx, p, nodeVersions) { + // objects = append(objects, &data.ObjectInfo{ + // ID: node.OID, + // IsDir: false, + // IsDeleteMarker: node.IsDeleteMarker(), + // Name: node.FilePath, + // Size: node.Size, + // Created: time.Time{}, + // HashSum: node.ETag, + // Owner: user.ID{}, + // Headers: nil, + // }) + //} + + sort.Slice(objects, func(i, j int) bool { + return objects[i].Name < objects[j].Name + }) + + if len(objects) > p.MaxKeys { + next = objects[p.MaxKeys] + objects = objects[:p.MaxKeys] + } + + return +} + +func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { + if p.MaxKeys == 0 { + return nil, nil, nil + } + + owner := n.BearerOwner(ctx) + cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) + session := n.cache.GetListSession(owner, cacheKey) + if session != nil { + // after reading next object from stream in session + // the current cache value already doesn't match with next token in cache key + n.cache.DeleteListSession(owner, cacheKey) + } else { + session = &data.ListSession{NamesMap: make(map[string]struct{})} + session.Context, session.Cancel = context.WithCancel(context.Background()) + + if bd, err := middleware.GetBoxData(ctx); err == nil { + session.Context = middleware.SetBoxData(session.Context, bd) + } + + session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix) + if err != nil { + return nil, nil, err + } + } + + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + + generator, errorCh := nodesGeneratorStream(poolCtx, p, session) + objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) + if err != nil { + return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) + } + + objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1) + if session.Next != nil { + objects = append(objects, session.Next) + } + + for obj := range objOutCh { + objects = append(objects, obj) + } + + //for node := range generator { + // objects = append(objects, &data.ObjectInfo{ + // ID: node.OID, + // IsDir: false, + // IsDeleteMarker: node.IsDeleteMarker(), + // Name: node.FilePath, + // Size: node.Size, + // Created: time.Time{}, + // HashSum: node.ETag, + // Owner: user.ID{}, + // Headers: nil, + // }) + //} + + if err = <-errorCh; err != nil { + return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) + } + + sort.Slice(objects, func(i, j int) bool { + return objects[i].Name < objects[j].Name + }) + + if len(objects) > p.MaxKeys { + next = objects[p.MaxKeys] + objects = objects[:p.MaxKeys] + } + + if next != nil { + session.Next = next + n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session) + } + + return +} + +func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) { + nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix) + if err != nil { + return nil, err + } + + versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) + + sort.Slice(nodeVersions, func(i, j int) bool { + return nodeVersions[i].FilePath < nodeVersions[j].FilePath + }) + + poolCtx, cancel := context.WithCancel(ctx) + defer cancel() + + pp := allObjectParams{ + Bucket: p.BktInfo, + Delimiter: p.Delimiter, + Prefix: p.Prefix, + Marker: p.KeyMarker, + ContinuationToken: p.VersionIDMarker, + MaxKeys: p.MaxKeys, + } + + objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions)) + if err != nil { + return nil, err + } + + for eoi := range objOutCh { + objVersions, ok := versions[eoi.ObjectInfo.Name] + if !ok { + objVersions = []*data.ExtendedObjectInfo{eoi} + } else if !eoi.ObjectInfo.IsDir { + objVersions = append(objVersions, eoi) + } + versions[eoi.ObjectInfo.Name] = objVersions + } + + return versions, nil +} + +func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { + nodeCh := make(chan *data.NodeVersion) + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + + go func() { + var generated int + LOOP: + for _, node := range nodeVersions { + if shouldSkip(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + generated++ + if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } + } + } + close(nodeCh) + }() + + return nodeCh +} + +func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { + nodeCh := make(chan *data.NodeVersion) + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories + + go func() { + var generated int + LOOP: + for _, node := range nodeVersions { + if shouldSkipVersions(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + generated++ + if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } + } + } + close(nodeCh) + }() + + return nodeCh +} + +func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { + nodeCh := make(chan *data.NodeVersion, 1000) + errCh := make(chan error, 1) + //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories + existed := stream.NamesMap + + if stream.Next != nil { + existed[continuationToken] = struct{}{} + } + + limit := p.MaxKeys + if stream.Next == nil { + limit++ + } + + go func() { + var generated int + var err error + + LOOP: + for err == nil { + node, err := stream.Stream.Next(ctx) + if err != nil { + if !errors.Is(err, io.EOF) { + fmt.Println(ctx.Err()) + errCh <- fmt.Errorf("stream next: %w", err) + } + break LOOP + } + + if shouldSkip(node, p, existed) { + continue + } + + select { + case <-ctx.Done(): + break LOOP + case nodeCh <- node: + generated++ + + if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken + break LOOP + } + } + } + close(nodeCh) + close(errCh) + }() + + return nodeCh, errCh +} + +func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { + reqLog := n.reqLogger(ctx) + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + // We have to make a copy of pointer to data.NodeVersion + // to get correct value in submitted task function. + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // do not process object which are definitely missing in object service + return + } + } + select { + case <-ctx.Done(): + case objCh <- oi: + } + }) + if err != nil { + wg.Done() + reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil +} + +func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) { + reqLog := n.reqLogger(ctx) + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ExtendedObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + // We have to make a copy of pointer to data.NodeVersion + // to get correct value in submitted task function. + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + + oi := &data.ObjectInfo{} + if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS + oi.ID = node.OID + oi.Name = node.FilePath + oi.Owner = node.DeleteMarker.Owner + oi.Created = node.DeleteMarker.Created + oi.IsDeleteMarker = true + } else { + oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // do not process object which are definitely missing in object service + return + } + } + } + + eoi := &data.ExtendedObjectInfo{ + ObjectInfo: oi, + NodeVersion: node, + } + + select { + case <-ctx.Done(): + case objCh <- eoi: + } + }) + if err != nil { + wg.Done() + reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil +} + +func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { + reqLog := n.reqLogger(ctx) + pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) + if err != nil { + return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) + } + objCh := make(chan *data.ObjectInfo) + + go func() { + var wg sync.WaitGroup + + LOOP: + for node := range input { + select { + case <-ctx.Done(): + break LOOP + default: + } + + // We have to make a copy of pointer to data.NodeVersion + // to get correct value in submitted task function. + func(node *data.NodeVersion) { + wg.Add(1) + err = pool.Submit(func() { + defer wg.Done() + oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) + if oi == nil { + // try to get object again + if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { + // do not process object which are definitely missing in object service + return + } + } + select { + case <-ctx.Done(): + case objCh <- oi: + } + }) + if err != nil { + wg.Done() + reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } + }(node) + } + wg.Wait() + close(objCh) + pool.Release() + }() + + return objCh, nil +} + +func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { + var err error + + owner := n.BearerOwner(ctx) + cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false) + nodeVersions := n.cache.GetList(owner, cacheKey) + + if nodeVersions == nil { + nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix) + if err != nil { + return nil, fmt.Errorf("get all versions from tree service: %w", err) + } + + n.cache.PutList(owner, cacheKey, nodeVersions) + } + + return nodeVersions, nil +} + +func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { + if node.IsDeleteMarker() { + return true + } + + filePath := node.FilePath + if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { + filePath = dirName + } + if _, ok := existed[filePath]; ok { + return true + } + + if filePath <= p.Marker { + return true + } + + if p.ContinuationToken != "" { + if _, ok := existed[continuationToken]; !ok { + if p.ContinuationToken != node.OID.EncodeToString() { + return true + } + existed[continuationToken] = struct{}{} + } + } + + existed[filePath] = struct{}{} + return false +} + +func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { + filePath := node.FilePath + if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { + filePath = dirName + if _, ok := existed[filePath]; ok { + return true + } + } + + if filePath < p.Marker { + return true + } + + if p.ContinuationToken != "" { + if _, ok := existed[continuationToken]; !ok { + if p.ContinuationToken != node.OID.EncodeToString() { + return true + } + existed[continuationToken] = struct{}{} + } + } + + existed[filePath] = struct{}{} + return false +} + +func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) { + for _, ov := range allObjects { + if ov.IsDir { + prefixes = append(prefixes, ov.Name) + } else { + objects = append(objects, ov) + } + } + + return +} + +func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) { + for _, ov := range allObjects { + if ov.ObjectInfo.IsDir { + prefixes = append(prefixes, ov.ObjectInfo.Name) + } else { + objects = append(objects, ov) + } + } + + return +} + +func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) { + if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil { + return oiDir + } + + owner := n.BearerOwner(ctx) + if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil { + return extInfo.ObjectInfo + } + + meta, err := n.objectHead(ctx, bktInfo, node.OID) + if err != nil { + n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err)) + return nil + } + + oi = objectInfoFromMeta(bktInfo, meta) + oi.MD5Sum = node.MD5 + n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) + + return oi +} + +func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo { + dirName := tryDirectoryName(node, prefix, delimiter) + if len(dirName) == 0 { + return nil + } + + return &data.ObjectInfo{ + ID: node.OID, // to use it as continuation token + CID: bktInfo.CID, + IsDir: true, + IsDeleteMarker: node.IsDeleteMarker(), + Bucket: bktInfo.Name, + Name: dirName, + } +} + +// tryDirectoryName forms directory name by prefix and delimiter. +// If node isn't a directory empty string is returned. +// This function doesn't check if node has a prefix. It must do a caller. +func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { + if len(delimiter) == 0 { + return "" + } + + tail := strings.TrimPrefix(node.FilePath, prefix) + index := strings.Index(tail, delimiter) + if index >= 0 { + return prefix + tail[:index+1] + } + + return "" +} + +func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) { + if p.KeyMarker == "" { + return objects, nil + } + + for i, obj := range objects { + if obj.ObjectInfo.Name == p.KeyMarker { + for j := i; j < len(objects); j++ { + if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name { + if p.VersionIDMarker == "" { + return objects[j:], nil + } + break + } + if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker { + return objects[j+1:], nil + } + } + return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) + } else if obj.ObjectInfo.Name > p.KeyMarker { + if p.VersionIDMarker != "" { + return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) + } + return objects[i:], nil + } + } + + // don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above + // that can be empty + return []*data.ExtendedObjectInfo{}, nil +} + +func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) { + if len(objVersions) == 0 { + return nil, nil + } + + var resVersion []*data.ExtendedObjectInfo + var resDelMarkVersions []*data.ExtendedObjectInfo + + for _, version := range objVersions { + if version.NodeVersion.IsDeleteMarker() { + resDelMarkVersions = append(resDelMarkVersions, version) + } else { + resVersion = append(resVersion, version) + } + } + + return resVersion, resDelMarkVersions +} diff --git a/api/layer/object.go b/api/layer/object.go index 670a696b..48e647ee 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -13,25 +13,19 @@ import ( "io" "mime" "path/filepath" - "runtime" - "sort" "strconv" "strings" - "sync" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/minio/sio" - "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -52,38 +46,6 @@ type ( bktInfo *data.BucketInfo } - // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. - ListObjectsParamsCommon struct { - BktInfo *data.BucketInfo - Delimiter string - Encode string - MaxKeys int - Prefix string - } - - // ListObjectsParamsV1 contains params for ListObjectsV1. - ListObjectsParamsV1 struct { - ListObjectsParamsCommon - Marker string - } - - // ListObjectsParamsV2 contains params for ListObjectsV2. - ListObjectsParamsV2 struct { - ListObjectsParamsCommon - ContinuationToken string - StartAfter string - FetchOwner bool - } - - allObjectParams struct { - Bucket *data.BucketInfo - Delimiter string - Prefix string - MaxKeys int - Marker string - ContinuationToken string - } - DeleteMarkerError struct { ErrorCode apiErrors.ErrorCode } @@ -534,61 +496,6 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil } -// ListObjectsV1 returns objects in a bucket for requests of Version 1. -func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { - var result ListObjectsInfoV1 - - prm := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.Marker, - } - - objects, next, err := n.getLatestObjectsVersions(ctx, prm) - if err != nil { - return nil, err - } - - if next != nil { - result.IsTruncated = true - result.NextMarker = objects[len(objects)-1].Name - } - - result.Prefixes, result.Objects = triageObjects(objects) - - return &result, nil -} - -// ListObjectsV2 returns objects in a bucket for requests of Version 2. -func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { - var result ListObjectsInfoV2 - - prm := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - MaxKeys: p.MaxKeys, - Marker: p.StartAfter, - ContinuationToken: p.ContinuationToken, - } - - objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm) - if err != nil { - return nil, err - } - - if next != nil { - result.IsTruncated = true - result.NextContinuationToken = next.ID.EncodeToString() - } - - result.Prefixes, result.Objects = triageObjects(objects) - - return &result, nil -} - type logWrapper struct { log *zap.Logger } @@ -597,645 +504,11 @@ func (l *logWrapper) Printf(format string, args ...interface{}) { l.log.Info(fmt.Sprintf(format, args...)) } -func PrintMemUsage() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - // For info on each, see: https://golang.org/pkg/runtime/#MemStats - fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) - fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) - fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) - fmt.Printf("\tNumGC = %v\n", m.NumGC) -} - -func bToMb(b uint64) uint64 { - return b / 1024 / 1024 -} - -func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { - if p.MaxKeys == 0 { - return nil, nil, nil - } - - owner := n.BearerOwner(ctx) - cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true) - nodeVersions := n.cache.GetList(owner, cacheKey) - - if nodeVersions == nil { - nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) - if err != nil { - return nil, nil, err - } - n.cache.PutList(owner, cacheKey, nodeVersions) - } - - if len(nodeVersions) == 0 { - return nil, nil, nil - } - - sort.Slice(nodeVersions, func(i, j int) bool { - return nodeVersions[i].FilePath < nodeVersions[j].FilePath - }) - - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) - if err != nil { - return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) - } - - objects = make([]*data.ObjectInfo, 0, p.MaxKeys) - - for obj := range objOutCh { - objects = append(objects, obj) - } - - //for node := range nodesGenerator(poolCtx, p, nodeVersions) { - // objects = append(objects, &data.ObjectInfo{ - // ID: node.OID, - // IsDir: false, - // IsDeleteMarker: node.IsDeleteMarker(), - // Name: node.FilePath, - // Size: node.Size, - // Created: time.Time{}, - // HashSum: node.ETag, - // Owner: user.ID{}, - // Headers: nil, - // }) - //} - - sort.Slice(objects, func(i, j int) bool { - return objects[i].Name < objects[j].Name - }) - - if len(objects) > p.MaxKeys { - next = objects[p.MaxKeys] - objects = objects[:p.MaxKeys] - } - - return -} - -func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { - if p.MaxKeys == 0 { - return nil, nil, nil - } - - owner := n.BearerOwner(ctx) - cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) - session := n.cache.GetListSession(owner, cacheKey) - if session != nil { - // after reading next object from stream in session - // the current cache value already doesn't match with next token in cache key - n.cache.DeleteListSession(owner, cacheKey) - } else { - session = &data.ListSession{NamesMap: make(map[string]struct{})} - session.Context, session.Cancel = context.WithCancel(context.Background()) - - if bd, err := middleware.GetBoxData(ctx); err == nil { - session.Context = middleware.SetBoxData(session.Context, bd) - } - - session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix) - if err != nil { - return nil, nil, err - } - } - - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - - generator, errorCh := nodesGeneratorStream(poolCtx, p, session) - objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) - if err != nil { - return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) - } - - objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1) - if session.Next != nil { - objects = append(objects, session.Next) - } - - for obj := range objOutCh { - objects = append(objects, obj) - } - - //for node := range generator { - // objects = append(objects, &data.ObjectInfo{ - // ID: node.OID, - // IsDir: false, - // IsDeleteMarker: node.IsDeleteMarker(), - // Name: node.FilePath, - // Size: node.Size, - // Created: time.Time{}, - // HashSum: node.ETag, - // Owner: user.ID{}, - // Headers: nil, - // }) - //} - - if err = <-errorCh; err != nil { - return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) - } - - sort.Slice(objects, func(i, j int) bool { - return objects[i].Name < objects[j].Name - }) - - if len(objects) > p.MaxKeys { - next = objects[p.MaxKeys] - objects = objects[:p.MaxKeys] - } - - if next != nil { - session.Next = next - n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session) - } - - return -} - -func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { - nodeCh := make(chan *data.NodeVersion) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - - go func() { - var generated int - LOOP: - for _, node := range nodeVersions { - if shouldSkip(node, p, existed) { - continue - } - - select { - case <-ctx.Done(): - break LOOP - case nodeCh <- node: - generated++ - if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken - break LOOP - } - } - } - close(nodeCh) - }() - - return nodeCh -} - -func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { - nodeCh := make(chan *data.NodeVersion) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - - go func() { - var generated int - LOOP: - for _, node := range nodeVersions { - if shouldSkipVersions(node, p, existed) { - continue - } - - select { - case <-ctx.Done(): - break LOOP - case nodeCh <- node: - generated++ - if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken - break LOOP - } - } - } - close(nodeCh) - }() - - return nodeCh -} - -func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { - nodeCh := make(chan *data.NodeVersion, 1000) - errCh := make(chan error, 1) - //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories - existed := stream.NamesMap - - if stream.Next != nil { - existed[continuationToken] = struct{}{} - } - - limit := p.MaxKeys - if stream.Next == nil { - limit++ - } - - go func() { - var generated int - var err error - - LOOP: - for err == nil { - node, err := stream.Stream.Next(ctx) - if err != nil { - if !errors.Is(err, io.EOF) { - fmt.Println(ctx.Err()) - errCh <- fmt.Errorf("stream next: %w", err) - } - break LOOP - } - - if shouldSkip(node, p, existed) { - continue - } - - select { - case <-ctx.Done(): - break LOOP - case nodeCh <- node: - generated++ - - if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken - break LOOP - } - } - } - close(nodeCh) - close(errCh) - }() - - return nodeCh, errCh -} - -func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { - reqLog := n.reqLogger(ctx) - pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) - if err != nil { - return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) - } - objCh := make(chan *data.ObjectInfo) - - go func() { - var wg sync.WaitGroup - - LOOP: - for node := range input { - select { - case <-ctx.Done(): - break LOOP - default: - } - - // We have to make a copy of pointer to data.NodeVersion - // to get correct value in submitted task function. - func(node *data.NodeVersion) { - wg.Add(1) - err = pool.Submit(func() { - defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) - if oi == nil { - // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { - // do not process object which are definitely missing in object service - return - } - } - select { - case <-ctx.Done(): - case objCh <- oi: - } - }) - if err != nil { - wg.Done() - reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) - } - }(node) - } - wg.Wait() - close(objCh) - pool.Release() - }() - - return objCh, nil -} - -func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) { - reqLog := n.reqLogger(ctx) - pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) - if err != nil { - return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) - } - objCh := make(chan *data.ExtendedObjectInfo) - - go func() { - var wg sync.WaitGroup - - LOOP: - for node := range input { - select { - case <-ctx.Done(): - break LOOP - default: - } - - // We have to make a copy of pointer to data.NodeVersion - // to get correct value in submitted task function. - func(node *data.NodeVersion) { - wg.Add(1) - err = pool.Submit(func() { - defer wg.Done() - - oi := &data.ObjectInfo{} - if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS - oi.ID = node.OID - oi.Name = node.FilePath - oi.Owner = node.DeleteMarker.Owner - oi.Created = node.DeleteMarker.Created - oi.IsDeleteMarker = true - } else { - oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) - if oi == nil { - // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { - // do not process object which are definitely missing in object service - return - } - } - } - - eoi := &data.ExtendedObjectInfo{ - ObjectInfo: oi, - NodeVersion: node, - } - - select { - case <-ctx.Done(): - case objCh <- eoi: - } - }) - if err != nil { - wg.Done() - reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) - } - }(node) - } - wg.Wait() - close(objCh) - pool.Release() - }() - - return objCh, nil -} - -func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { - reqLog := n.reqLogger(ctx) - pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) - if err != nil { - return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) - } - objCh := make(chan *data.ObjectInfo) - - go func() { - var wg sync.WaitGroup - - LOOP: - for node := range input { - select { - case <-ctx.Done(): - break LOOP - default: - } - - // We have to make a copy of pointer to data.NodeVersion - // to get correct value in submitted task function. - func(node *data.NodeVersion) { - wg.Add(1) - err = pool.Submit(func() { - defer wg.Done() - oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) - if oi == nil { - // try to get object again - if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { - // do not process object which are definitely missing in object service - return - } - } - select { - case <-ctx.Done(): - case objCh <- oi: - } - }) - if err != nil { - wg.Done() - reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) - } - }(node) - } - wg.Wait() - close(objCh) - pool.Release() - }() - - return objCh, nil -} - -func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { - var err error - - owner := n.BearerOwner(ctx) - cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false) - nodeVersions := n.cache.GetList(owner, cacheKey) - - if nodeVersions == nil { - nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix) - if err != nil { - return nil, fmt.Errorf("get all versions from tree service: %w", err) - } - - n.cache.PutList(owner, cacheKey, nodeVersions) - } - - return nodeVersions, nil -} - -func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) { - nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix) - if err != nil { - return nil, err - } - - versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) - - sort.Slice(nodeVersions, func(i, j int) bool { - return nodeVersions[i].FilePath < nodeVersions[j].FilePath - }) - - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - - pp := allObjectParams{ - Bucket: p.BktInfo, - Delimiter: p.Delimiter, - Prefix: p.Prefix, - Marker: p.KeyMarker, - ContinuationToken: p.VersionIDMarker, - MaxKeys: p.MaxKeys, - } - - objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions)) - if err != nil { - return nil, err - } - - for eoi := range objOutCh { - objVersions, ok := versions[eoi.ObjectInfo.Name] - if !ok { - objVersions = []*data.ExtendedObjectInfo{eoi} - } else if !eoi.ObjectInfo.IsDir { - objVersions = append(objVersions, eoi) - } - versions[eoi.ObjectInfo.Name] = objVersions - } - - return versions, nil -} - func IsSystemHeader(key string) bool { _, ok := api.SystemMetadata[key] return ok || strings.HasPrefix(key, api.FrostFSSystemMetadataPrefix) } -func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { - if node.IsDeleteMarker() { - return true - } - - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { - filePath = dirName - } - if _, ok := existed[filePath]; ok { - return true - } - - if filePath <= p.Marker { - return true - } - - if p.ContinuationToken != "" { - if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { - return true - } - existed[continuationToken] = struct{}{} - } - } - - existed[filePath] = struct{}{} - return false -} - -func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { - filePath = dirName - if _, ok := existed[filePath]; ok { - return true - } - } - - if filePath < p.Marker { - return true - } - - if p.ContinuationToken != "" { - if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { - return true - } - existed[continuationToken] = struct{}{} - } - } - - existed[filePath] = struct{}{} - return false -} - -func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) { - for _, ov := range allObjects { - if ov.IsDir { - prefixes = append(prefixes, ov.Name) - } else { - objects = append(objects, ov) - } - } - - return -} - -func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) { - for _, ov := range allObjects { - if ov.ObjectInfo.IsDir { - prefixes = append(prefixes, ov.ObjectInfo.Name) - } else { - objects = append(objects, ov) - } - } - - return -} - -func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) { - if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil { - return oiDir - } - - owner := n.BearerOwner(ctx) - if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil { - return extInfo.ObjectInfo - } - - meta, err := n.objectHead(ctx, bktInfo, node.OID) - if err != nil { - n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err)) - return nil - } - - oi = objectInfoFromMeta(bktInfo, meta) - oi.MD5Sum = node.MD5 - n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) - - return oi -} - -func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo { - dirName := tryDirectoryName(node, prefix, delimiter) - if len(dirName) == 0 { - return nil - } - - return &data.ObjectInfo{ - ID: node.OID, // to use it as continuation token - CID: bktInfo.CID, - IsDir: true, - IsDeleteMarker: node.IsDeleteMarker(), - Bucket: bktInfo.Name, - Name: dirName, - } -} - -// tryDirectoryName forms directory name by prefix and delimiter. -// If node isn't a directory empty string is returned. -// This function doesn't check if node has a prefix. It must do a caller. -func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { - if len(delimiter) == 0 { - return "" - } - - tail := strings.TrimPrefix(node.FilePath, prefix) - index := strings.Index(tail, delimiter) - if index >= 0 { - return prefix + tail[:index+1] - } - - return "" -} - func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader { if input == nil { return nil diff --git a/api/layer/util.go b/api/layer/util.go index 658ecca8..e6438490 100644 --- a/api/layer/util.go +++ b/api/layer/util.go @@ -13,39 +13,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) -type ( - // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. - ListObjectsInfo struct { - Prefixes []string - Objects []*data.ObjectInfo - IsTruncated bool - } - - // ListObjectsInfoV1 holds data which ListObjectsV1 returns. - ListObjectsInfoV1 struct { - ListObjectsInfo - NextMarker string - } - - // ListObjectsInfoV2 holds data which ListObjectsV2 returns. - ListObjectsInfoV2 struct { - ListObjectsInfo - NextContinuationToken string - } - - // ListObjectVersionsInfo stores info and list of objects versions. - ListObjectVersionsInfo struct { - CommonPrefixes []string - IsTruncated bool - KeyMarker string - NextKeyMarker string - NextVersionIDMarker string - Version []*data.ExtendedObjectInfo - DeleteMarker []*data.ExtendedObjectInfo - VersionIDMarker string - } -) - // PathSeparator is a path components separator string. const PathSeparator = string(os.PathSeparator) diff --git a/api/layer/versioning.go b/api/layer/versioning.go deleted file mode 100644 index 288072be..00000000 --- a/api/layer/versioning.go +++ /dev/null @@ -1,109 +0,0 @@ -package layer - -import ( - "context" - "sort" - - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" - s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" -) - -func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { - versions, err := n.getAllObjectsVersions(ctx, p) - if err != nil { - return nil, err - } - - sortedNames := make([]string, 0, len(versions)) - for k := range versions { - sortedNames = append(sortedNames, k) - } - sort.Strings(sortedNames) - - allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys) - - for _, name := range sortedNames { - sortedVersions := versions[name] - sort.Slice(sortedVersions, func(i, j int) bool { - return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order - }) - - for i, version := range sortedVersions { - version.IsLatest = i == 0 - allObjects = append(allObjects, version) - } - } - - if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { - return nil, err - } - - res := &ListObjectVersionsInfo{ - KeyMarker: p.KeyMarker, - VersionIDMarker: p.VersionIDMarker, - } - - res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects) - - if len(allObjects) > p.MaxKeys { - res.IsTruncated = true - res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name - res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID() - - allObjects = allObjects[:p.MaxKeys] - } - - res.Version, res.DeleteMarker = triageVersions(allObjects) - return res, nil -} - -func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) { - if p.KeyMarker == "" { - return objects, nil - } - - for i, obj := range objects { - if obj.ObjectInfo.Name == p.KeyMarker { - for j := i; j < len(objects); j++ { - if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name { - if p.VersionIDMarker == "" { - return objects[j:], nil - } - break - } - if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker { - return objects[j+1:], nil - } - } - return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) - } else if obj.ObjectInfo.Name > p.KeyMarker { - if p.VersionIDMarker != "" { - return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) - } - return objects[i:], nil - } - } - - // don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above - // that can be empty - return []*data.ExtendedObjectInfo{}, nil -} - -func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) { - if len(objVersions) == 0 { - return nil, nil - } - - var resVersion []*data.ExtendedObjectInfo - var resDelMarkVersions []*data.ExtendedObjectInfo - - for _, version := range objVersions { - if version.NodeVersion.IsDeleteMarker() { - resDelMarkVersions = append(resDelMarkVersions, version) - } else { - resVersion = append(resVersion, version) - } - } - - return resVersion, resDelMarkVersions -}