package layer import ( "context" "errors" "fmt" "io" "sort" "strings" "sync" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) type ( // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. ListObjectsParamsCommon struct { BktInfo *data.BucketInfo Delimiter string Encode string MaxKeys int Prefix string } // ListObjectsParamsV1 contains params for ListObjectsV1. ListObjectsParamsV1 struct { ListObjectsParamsCommon Marker string } // ListObjectsParamsV2 contains params for ListObjectsV2. ListObjectsParamsV2 struct { ListObjectsParamsCommon ContinuationToken string StartAfter string FetchOwner bool } // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. ListObjectsInfo struct { Prefixes []string Objects []*data.ExtendedNodeVersion IsTruncated bool } // ListObjectsInfoV1 holds data which ListObjectsV1 returns. ListObjectsInfoV1 struct { ListObjectsInfo NextMarker string } // ListObjectsInfoV2 holds data which ListObjectsV2 returns. ListObjectsInfoV2 struct { ListObjectsInfo NextContinuationToken string } // ListObjectVersionsInfo stores info and list of objects versions. ListObjectVersionsInfo struct { CommonPrefixes []string IsTruncated bool KeyMarker string NextKeyMarker string NextVersionIDMarker string Version []*data.ExtendedNodeVersion DeleteMarker []*data.ExtendedNodeVersion VersionIDMarker string } commonVersionsListingParams struct { BktInfo *data.BucketInfo Delimiter string Prefix string MaxKeys int Marker string Bookmark string } commonLatestVersionsListingParams struct { commonVersionsListingParams ListType ListType } ) type ListType int const ( ListObjectsV1Type ListType = iota + 1 ListObjectsV2Type ListType = iota + 1 ) // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *Layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var result ListObjectsInfoV1 prm := commonLatestVersionsListingParams{ commonVersionsListingParams: commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.Marker, Bookmark: p.Marker, }, ListType: ListObjectsV1Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextMarker = objects[len(objects)-1].Name() } result.Prefixes, result.Objects = triageExtendedObjects(objects) return &result, nil } // ListObjectsV2 returns objects in a bucket for requests of Version 2. func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { var result ListObjectsInfoV2 prm := commonLatestVersionsListingParams{ commonVersionsListingParams: commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.StartAfter, Bookmark: p.ContinuationToken, }, ListType: ListObjectsV2Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextContinuationToken = next.NodeVersion.OID.EncodeToString() } result.Prefixes, result.Objects = triageExtendedObjects(objects) return &result, nil } func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { prm := commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.KeyMarker, Bookmark: p.VersionIDMarker, } objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm) if err != nil { return nil, err } res := &ListObjectVersionsInfo{ KeyMarker: p.KeyMarker, VersionIDMarker: p.VersionIDMarker, IsTruncated: isTruncated, } if res.IsTruncated { res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() } res.CommonPrefixes, objects = triageExtendedObjects(objects) res.Version, res.DeleteMarker = triageVersions(objects) return res, nil } func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) { if p.MaxKeys == 0 { return nil, nil, nil } session, err := n.getListLatestVersionsSession(ctx, p) if err != nil { return nil, nil, err } generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1) objects = append(objects, session.Next...) for obj := range objOutCh { objects = append(objects, obj) } if err = <-errorCh; err != nil { return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) } sort.Slice(objects, func(i, j int) bool { return objects[i].NodeVersion.FilePath < objects[j].NodeVersion.FilePath }) if len(objects) > p.MaxKeys { next = objects[p.MaxKeys] n.putListLatestVersionsSession(ctx, p, session, objects) objects = objects[:p.MaxKeys] } return } func (n *Layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) { if p.MaxKeys == 0 { return nil, false, nil } session, err := n.getListAllVersionsSession(ctx, p) if err != nil { return nil, false, err } generator, errorCh := nodesGeneratorVersions(ctx, p, session) objOutCh, err := n.initWorkerPool(ctx, 2, p, generator) if err != nil { return nil, false, err } allObjects := handleGeneratedVersions(objOutCh, p, session) sort.SliceStable(allObjects, func(i, j int) bool { return allObjects[i].NodeVersion.FilePath < allObjects[j].NodeVersion.FilePath }) if err = <-errorCh; err != nil { return nil, false, fmt.Errorf("failed to get next object from stream: %w", err) } var isTruncated bool if len(allObjects) > p.MaxKeys { isTruncated = true n.putListAllVersionsSession(ctx, p, session, allObjects) allObjects = allObjects[:p.MaxKeys] } return allObjects, isTruncated, nil } func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion { var lastName string var listRowStartIndex int allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { name := eoi.NodeVersion.FilePath if eoi.DirName != "" { name = eoi.DirName } if lastName != name { formVersionsListRow(allObjects, listRowStartIndex, session) listRowStartIndex = len(allObjects) allObjects = append(allObjects, eoi) } else if eoi.DirName == "" { allObjects = append(allObjects, eoi) } lastName = name } formVersionsListRow(allObjects, listRowStartIndex, session) return allObjects } func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) { if len(objects) == 0 { return } prevVersions := objects[rowStartIndex:] sort.Slice(prevVersions, func(i, j int) bool { return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first }) prevVersions[0].IsLatest = len(session.Next) == 0 || session.Next[0].NodeVersion.FilePath != prevVersions[0].NodeVersion.FilePath for _, version := range prevVersions[1:] { version.IsLatest = false } } func (n *Layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true) } func (n *Layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) { return n.getListVersionsSession(ctx, p, false) } func (n *Layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) { owner := n.BearerOwner(ctx) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) session := n.cache.GetListSession(owner, cacheKey) if session == nil { return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } if session.Acquired.Swap(true) { return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } // after reading next object from stream in session // the current cache value already doesn't match with next token in cache key n.cache.DeleteListSession(owner, cacheKey) return session, nil } func (n *Layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { session = &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) if bd, err := middleware.GetBoxData(ctx); err == nil { session.Context = middleware.SetBox(session.Context, &middleware.Box{AccessBox: bd}) } session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) if err != nil { return nil, err } return session, nil } func (n *Layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { if len(allObjects) <= p.MaxKeys { return } var cacheKey cache.ListSessionKey switch p.ListType { case ListObjectsV1Type: cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].Name()) case ListObjectsV2Type: cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].NodeVersion.OID.EncodeToString()) default: // should never happen panic("invalid list type") } session.Acquired.Store(false) session.Next = []*data.ExtendedNodeVersion{allObjects[p.MaxKeys]} n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } func (n *Layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { if len(allObjects) <= p.MaxKeys { return } session.Acquired.Store(false) session.Next = make([]*data.ExtendedNodeVersion, len(allObjects)-p.MaxKeys+1) session.Next[0] = allObjects[p.MaxKeys-1] for i, node := range allObjects[p.MaxKeys:] { session.Next[i+1] = node } cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].NodeVersion.OID.EncodeToString()) n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) { nodeCh := make(chan *data.ExtendedNodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap if len(stream.Next) != 0 { existed[continuationToken] = struct{}{} } limit := p.MaxKeys if len(stream.Next) == 0 { limit++ } go func() { var generated int var err error LOOP: for err == nil { node, err := stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { errCh <- fmt.Errorf("stream next: %w", err) } break LOOP } nodeExt := &data.ExtendedNodeVersion{ NodeVersion: node, IsLatest: true, DirName: tryDirectoryName(node, p.Prefix, p.Delimiter), } if shouldSkip(nodeExt, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- nodeExt: generated++ if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) close(errCh) }() return nodeCh, errCh } func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) { nodeCh := make(chan *data.ExtendedNodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap delete(existed, continuationToken) go func() { var ( generated int ind int err error lastName string node *data.NodeVersion nodeExt *data.ExtendedNodeVersion ) LOOP: for err == nil { if ind < len(stream.Next) { nodeExt = stream.Next[ind] ind++ } else { node, err = stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { errCh <- fmt.Errorf("stream next: %w", err) } break LOOP } nodeExt = &data.ExtendedNodeVersion{ NodeVersion: node, DirName: tryDirectoryName(node, p.Prefix, p.Delimiter), } } if shouldSkipVersions(nodeExt, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- nodeExt: generated++ if generated > p.MaxKeys && nodeExt.NodeVersion.FilePath != lastName { break LOOP } lastName = nodeExt.NodeVersion.FilePath } } close(nodeCh) close(errCh) }() return nodeCh, errCh } func (n *Layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.ExtendedNodeVersion, size) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } if node.DirName != "" || node.NodeVersion.IsFilledExtra() { select { case <-ctx.Done(): case objCh <- node: } } else { // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.ExtendedNodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil { // do not process object which are definitely missing in object service return } } realSize, err := GetObjectSize(oi) if err != nil { reqLog.Debug(logs.FailedToGetRealObjectSize, zap.Error(err)) realSize = oi.Size } node.NodeVersion.FillExtra(&oi.Owner, &oi.Created, realSize) select { case <-ctx.Done(): case objCh <- node: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { if node.NodeVersion.IsDeleteMarker { return true } filePath := node.NodeVersion.FilePath if node.DirName != "" { filePath = node.DirName } if _, ok := existed[filePath]; ok { return true } if filePath <= p.Marker { return true } if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { if p.Bookmark != node.NodeVersion.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} } } existed[filePath] = struct{}{} return false } func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { filePath := node.NodeVersion.FilePath if node.DirName != "" { filePath = node.DirName if _, ok := existed[filePath]; ok { return true } } if filePath < p.Marker { return true } if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { if p.Bookmark != node.NodeVersion.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} return true } } existed[filePath] = struct{}{} return false } func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []string, objects []*data.ExtendedNodeVersion) { for _, ov := range allObjects { if ov.DirName != "" { prefixes = append(prefixes, ov.DirName) } else { objects = append(objects, ov) } } return } func (n *Layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) { owner := n.BearerOwner(ctx) if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil { return extInfo.ObjectInfo } meta, err := n.objectHead(ctx, bktInfo, node.OID) if err != nil { n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err)) return nil } oi = objectInfoFromMeta(bktInfo, meta) oi.MD5Sum = node.MD5 n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) return oi } // tryDirectoryName forms directory name by prefix and delimiter. // If node isn't a directory empty string is returned. // This function doesn't check if node has a prefix. It must do a caller. func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { if len(delimiter) == 0 { return "" } tail := strings.TrimPrefix(node.FilePath, prefix) index := strings.Index(tail, delimiter) if index >= 0 { return prefix + tail[:index+1] } return "" } func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) { if p.KeyMarker == "" { return objects, nil } for i, obj := range objects { if obj.NodeVersion.FilePath == p.KeyMarker { for j := i; j < len(objects); j++ { if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath { if p.VersionIDMarker == "" { return objects[j:], nil } break } if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker { return objects[j+1:], nil } } return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } else if obj.NodeVersion.FilePath > p.KeyMarker { if p.VersionIDMarker != "" { return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } return objects[i:], nil } } // don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above // that can be empty return []*data.ExtendedNodeVersion{}, nil } func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) { if len(objVersions) == 0 { return nil, nil } var resVersion []*data.ExtendedNodeVersion var resDelMarkVersions []*data.ExtendedNodeVersion for _, version := range objVersions { if version.NodeVersion.IsDeleteMarker { resDelMarkVersions = append(resDelMarkVersions, version) } else { resVersion = append(resVersion, version) } } return resVersion, resDelMarkVersions }