package layer import ( "context" "errors" "fmt" "io" "sort" "strings" "sync" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) type ( // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. ListObjectsParamsCommon struct { BktInfo *data.BucketInfo Delimiter string Encode string MaxKeys int Prefix string } // ListObjectsParamsV1 contains params for ListObjectsV1. ListObjectsParamsV1 struct { ListObjectsParamsCommon Marker string } // ListObjectsParamsV2 contains params for ListObjectsV2. ListObjectsParamsV2 struct { ListObjectsParamsCommon ContinuationToken string StartAfter string FetchOwner bool } // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. ListObjectsInfo struct { Prefixes []string Objects []*data.ObjectInfo IsTruncated bool } // ListObjectsInfoV1 holds data which ListObjectsV1 returns. ListObjectsInfoV1 struct { ListObjectsInfo NextMarker string } // ListObjectsInfoV2 holds data which ListObjectsV2 returns. ListObjectsInfoV2 struct { ListObjectsInfo NextContinuationToken string } // ListObjectVersionsInfo stores info and list of objects versions. ListObjectVersionsInfo struct { CommonPrefixes []string IsTruncated bool KeyMarker string NextKeyMarker string NextVersionIDMarker string Version []*data.ExtendedObjectInfo DeleteMarker []*data.ExtendedObjectInfo VersionIDMarker string } allObjectParams struct { Bucket *data.BucketInfo Delimiter string Prefix string MaxKeys int Marker string ContinuationToken string } ) // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var result ListObjectsInfoV1 prm := allObjectParams{ Bucket: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.Marker, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextMarker = objects[len(objects)-1].Name } result.Prefixes, result.Objects = triageObjects(objects) return &result, nil } // ListObjectsV2 returns objects in a bucket for requests of Version 2. func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { var result ListObjectsInfoV2 prm := allObjectParams{ Bucket: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.StartAfter, ContinuationToken: p.ContinuationToken, } objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextContinuationToken = next.ID.EncodeToString() } result.Prefixes, result.Objects = triageObjects(objects) return &result, nil } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { versions, err := n.getAllObjectsVersions(ctx, p) if err != nil { return nil, err } sortedNames := make([]string, 0, len(versions)) for k := range versions { sortedNames = append(sortedNames, k) } sort.Strings(sortedNames) allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys) for _, name := range sortedNames { sortedVersions := versions[name] sort.Slice(sortedVersions, func(i, j int) bool { return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order }) for i, version := range sortedVersions { version.IsLatest = i == 0 allObjects = append(allObjects, version) } } if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil { return nil, err } res := &ListObjectVersionsInfo{ KeyMarker: p.KeyMarker, VersionIDMarker: p.VersionIDMarker, } res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects) if len(allObjects) > p.MaxKeys { res.IsTruncated = true res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID() allObjects = allObjects[:p.MaxKeys] } res.Version, res.DeleteMarker = triageVersions(allObjects) return res, nil } func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { if p.MaxKeys == 0 { return nil, nil, nil } owner := n.BearerOwner(ctx) cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true) nodeVersions := n.cache.GetList(owner, cacheKey) if nodeVersions == nil { nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) if err != nil { return nil, nil, err } n.cache.PutList(owner, cacheKey, nodeVersions) } if len(nodeVersions) == 0 { return nil, nil, nil } sort.Slice(nodeVersions, func(i, j int) bool { return nodeVersions[i].FilePath < nodeVersions[j].FilePath }) poolCtx, cancel := context.WithCancel(ctx) defer cancel() objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.ObjectInfo, 0, p.MaxKeys) for obj := range objOutCh { objects = append(objects, obj) } //for node := range nodesGenerator(poolCtx, p, nodeVersions) { // objects = append(objects, &data.ObjectInfo{ // ID: node.OID, // IsDir: false, // IsDeleteMarker: node.IsDeleteMarker(), // Name: node.FilePath, // Size: node.Size, // Created: time.Time{}, // HashSum: node.ETag, // Owner: user.ID{}, // Headers: nil, // }) //} sort.Slice(objects, func(i, j int) bool { return objects[i].Name < objects[j].Name }) if len(objects) > p.MaxKeys { next = objects[p.MaxKeys] objects = objects[:p.MaxKeys] } return } func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) { if p.MaxKeys == 0 { return nil, nil, nil } owner := n.BearerOwner(ctx) cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) session := n.cache.GetListSession(owner, cacheKey) if session != nil { // after reading next object from stream in session // the current cache value already doesn't match with next token in cache key n.cache.DeleteListSession(owner, cacheKey) } else { session = &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) if bd, err := middleware.GetBoxData(ctx); err == nil { session.Context = middleware.SetBoxData(session.Context, bd) } session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix) if err != nil { return nil, nil, err } } poolCtx, cancel := context.WithCancel(ctx) defer cancel() generator, errorCh := nodesGeneratorStream(poolCtx, p, session) objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1) if session.Next != nil { objects = append(objects, session.Next) } for obj := range objOutCh { objects = append(objects, obj) } //for node := range generator { // objects = append(objects, &data.ObjectInfo{ // ID: node.OID, // IsDir: false, // IsDeleteMarker: node.IsDeleteMarker(), // Name: node.FilePath, // Size: node.Size, // Created: time.Time{}, // HashSum: node.ETag, // Owner: user.ID{}, // Headers: nil, // }) //} if err = <-errorCh; err != nil { return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) } sort.Slice(objects, func(i, j int) bool { return objects[i].Name < objects[j].Name }) if len(objects) > p.MaxKeys { next = objects[p.MaxKeys] objects = objects[:p.MaxKeys] } if next != nil { session.Next = next n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session) } return } func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) { nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix) if err != nil { return nil, err } versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) sort.Slice(nodeVersions, func(i, j int) bool { return nodeVersions[i].FilePath < nodeVersions[j].FilePath }) poolCtx, cancel := context.WithCancel(ctx) defer cancel() pp := allObjectParams{ Bucket: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, Marker: p.KeyMarker, ContinuationToken: p.VersionIDMarker, MaxKeys: p.MaxKeys, } objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, nodesGeneratorVersions(poolCtx, pp, nodeVersions)) if err != nil { return nil, err } for eoi := range objOutCh { objVersions, ok := versions[eoi.ObjectInfo.Name] if !ok { objVersions = []*data.ExtendedObjectInfo{eoi} } else if !eoi.ObjectInfo.IsDir { objVersions = append(objVersions, eoi) } versions[eoi.ObjectInfo.Name] = objVersions } return versions, nil } func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { nodeCh := make(chan *data.NodeVersion) existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories go func() { var generated int LOOP: for _, node := range nodeVersions { if shouldSkip(node, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- node: generated++ if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) }() return nodeCh } func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { nodeCh := make(chan *data.NodeVersion) existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories go func() { var generated int LOOP: for _, node := range nodeVersions { if shouldSkipVersions(node, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- node: generated++ if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) }() return nodeCh } func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap if stream.Next != nil { existed[continuationToken] = struct{}{} } limit := p.MaxKeys if stream.Next == nil { limit++ } go func() { var generated int var err error LOOP: for err == nil { node, err := stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { fmt.Println(ctx.Err()) errCh <- fmt.Errorf("stream next: %w", err) } break LOOP } if shouldSkip(node, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- node: generated++ if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) close(errCh) }() return nodeCh, errCh } func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.ObjectInfo) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.NodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { // do not process object which are definitely missing in object service return } } select { case <-ctx.Done(): case objCh <- oi: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.ExtendedObjectInfo) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.NodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := &data.ObjectInfo{} if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS oi.ID = node.OID oi.Name = node.FilePath oi.Owner = node.DeleteMarker.Owner oi.Created = node.DeleteMarker.Created oi.IsDeleteMarker = true } else { oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { // do not process object which are definitely missing in object service return } } } eoi := &data.ExtendedObjectInfo{ ObjectInfo: oi, NodeVersion: node, } select { case <-ctx.Done(): case objCh <- eoi: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.ObjectInfo) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.NodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil { // do not process object which are definitely missing in object service return } } select { case <-ctx.Done(): case objCh <- oi: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) { var err error owner := n.BearerOwner(ctx) cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false) nodeVersions := n.cache.GetList(owner, cacheKey) if nodeVersions == nil { nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix) if err != nil { return nil, fmt.Errorf("get all versions from tree service: %w", err) } n.cache.PutList(owner, cacheKey, nodeVersions) } return nodeVersions, nil } func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { if node.IsDeleteMarker() { return true } filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName } if _, ok := existed[filePath]; ok { return true } if filePath <= p.Marker { return true } if p.ContinuationToken != "" { if _, ok := existed[continuationToken]; !ok { if p.ContinuationToken != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} } } existed[filePath] = struct{}{} return false } func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName if _, ok := existed[filePath]; ok { return true } } if filePath < p.Marker { return true } if p.ContinuationToken != "" { if _, ok := existed[continuationToken]; !ok { if p.ContinuationToken != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} } } existed[filePath] = struct{}{} return false } func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) { for _, ov := range allObjects { if ov.IsDir { prefixes = append(prefixes, ov.Name) } else { objects = append(objects, ov) } } return } func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) { for _, ov := range allObjects { if ov.ObjectInfo.IsDir { prefixes = append(prefixes, ov.ObjectInfo.Name) } else { objects = append(objects, ov) } } return } func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) { if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil { return oiDir } owner := n.BearerOwner(ctx) if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil { return extInfo.ObjectInfo } meta, err := n.objectHead(ctx, bktInfo, node.OID) if err != nil { n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err)) return nil } oi = objectInfoFromMeta(bktInfo, meta) oi.MD5Sum = node.MD5 n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) return oi } func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo { dirName := tryDirectoryName(node, prefix, delimiter) if len(dirName) == 0 { return nil } return &data.ObjectInfo{ ID: node.OID, // to use it as continuation token CID: bktInfo.CID, IsDir: true, IsDeleteMarker: node.IsDeleteMarker(), Bucket: bktInfo.Name, Name: dirName, } } // tryDirectoryName forms directory name by prefix and delimiter. // If node isn't a directory empty string is returned. // This function doesn't check if node has a prefix. It must do a caller. func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { if len(delimiter) == 0 { return "" } tail := strings.TrimPrefix(node.FilePath, prefix) index := strings.Index(tail, delimiter) if index >= 0 { return prefix + tail[:index+1] } return "" } func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) { if p.KeyMarker == "" { return objects, nil } for i, obj := range objects { if obj.ObjectInfo.Name == p.KeyMarker { for j := i; j < len(objects); j++ { if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name { if p.VersionIDMarker == "" { return objects[j:], nil } break } if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker { return objects[j+1:], nil } } return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } else if obj.ObjectInfo.Name > p.KeyMarker { if p.VersionIDMarker != "" { return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } return objects[i:], nil } } // don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above // that can be empty return []*data.ExtendedObjectInfo{}, nil } func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) { if len(objVersions) == 0 { return nil, nil } var resVersion []*data.ExtendedObjectInfo var resDelMarkVersions []*data.ExtendedObjectInfo for _, version := range objVersions { if version.NodeVersion.IsDeleteMarker() { resDelMarkVersions = append(resDelMarkVersions, version) } else { resVersion = append(resVersion, version) } } return resVersion, resDelMarkVersions }