package layer import ( "context" "errors" "fmt" "io" "sort" "strings" "sync" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) type ( // ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2. ListObjectsParamsCommon struct { BktInfo *data.BucketInfo Delimiter string Encode string MaxKeys int Prefix string } // ListObjectsParamsV1 contains params for ListObjectsV1. ListObjectsParamsV1 struct { ListObjectsParamsCommon Marker string } // ListObjectsParamsV2 contains params for ListObjectsV2. ListObjectsParamsV2 struct { ListObjectsParamsCommon ContinuationToken string StartAfter string FetchOwner bool } // ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2. ListObjectsInfo struct { Prefixes []string Objects []*data.NodeVersion IsTruncated bool } // ListObjectsInfoV1 holds data which ListObjectsV1 returns. ListObjectsInfoV1 struct { ListObjectsInfo NextMarker string } // ListObjectsInfoV2 holds data which ListObjectsV2 returns. ListObjectsInfoV2 struct { ListObjectsInfo NextContinuationToken string } // ListObjectVersionsInfo stores info and list of objects versions. ListObjectVersionsInfo struct { CommonPrefixes []string IsTruncated bool KeyMarker string NextKeyMarker string NextVersionIDMarker string Version []*data.ExtendedNodeVersion DeleteMarker []*data.ExtendedNodeVersion VersionIDMarker string } commonVersionsListingParams struct { BktInfo *data.BucketInfo Delimiter string Prefix string MaxKeys int Marker string Bookmark string } commonLatestVersionsListingParams struct { commonVersionsListingParams ListType ListType } ) type ListType int const ( ListObjectsV1Type ListType = iota + 1 ListObjectsV2Type ListType = iota + 1 ) // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var result ListObjectsInfoV1 prm := commonLatestVersionsListingParams{ commonVersionsListingParams: commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.Marker, Bookmark: p.Marker, }, ListType: ListObjectsV1Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextMarker = objects[len(objects)-1].FilePath } result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter) return &result, nil } // ListObjectsV2 returns objects in a bucket for requests of Version 2. func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) { var result ListObjectsInfoV2 prm := commonLatestVersionsListingParams{ commonVersionsListingParams: commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.StartAfter, Bookmark: p.ContinuationToken, }, ListType: ListObjectsV2Type, } objects, next, err := n.getLatestObjectsVersions(ctx, prm) if err != nil { return nil, err } if next != nil { result.IsTruncated = true result.NextContinuationToken = next.OID.EncodeToString() } result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter) return &result, nil } func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { prm := commonVersionsListingParams{ BktInfo: p.BktInfo, Delimiter: p.Delimiter, Prefix: p.Prefix, MaxKeys: p.MaxKeys, Marker: p.KeyMarker, Bookmark: p.VersionIDMarker, } objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm) if err != nil { return nil, err } res := &ListObjectVersionsInfo{ KeyMarker: p.KeyMarker, VersionIDMarker: p.VersionIDMarker, IsTruncated: isTruncated, } if res.IsTruncated { res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString() } res.CommonPrefixes, objects = triageExtendedObjects(objects, p.Prefix, p.Delimiter) res.Version, res.DeleteMarker = triageVersions(objects) return res, nil } func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) { if p.MaxKeys == 0 { return nil, nil, nil } session, err := n.getListLatestVersionsSession(ctx, p) if err != nil { return nil, nil, err } generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session) objOutCh, err := n.initWorkerPoolStream(ctx, 2, p.commonVersionsListingParams, generator) if err != nil { return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) } objects = make([]*data.NodeVersion, 0, p.MaxKeys+1) objects = append(objects, session.Next...) for obj := range objOutCh { objects = append(objects, obj) } if err = <-errorCh; err != nil { return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err) } if len(objects) > p.MaxKeys { next = objects[p.MaxKeys] n.putListLatestVersionsSession(ctx, p, session, objects) objects = objects[:p.MaxKeys] } return } func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) { if p.MaxKeys == 0 { return nil, false, nil } session, err := n.getListAllVersionsSession(ctx, p) if err != nil { return nil, false, err } generator, errorCh := nodesGeneratorVersions(ctx, p, session) objOutCh, err := n.initWorkerPoolVersions(ctx, 2, p, generator) if err != nil { return nil, false, err } allObjects := handleGeneratedVersions(objOutCh, p, session) if err = <-errorCh; err != nil { return nil, false, fmt.Errorf("failed to get next object from stream: %w", err) } var isTruncated bool if len(allObjects) > p.MaxKeys { isTruncated = true n.putListAllVersionsSession(ctx, p, session, allObjects) allObjects = allObjects[:p.MaxKeys] } return allObjects, isTruncated, nil } func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion { var lastName string var listRowStartIndex int allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { name := eoi.NodeVersion.FilePath dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter) if dirName != "" { name = dirName } if lastName != name { formVersionsListRow(allObjects, listRowStartIndex, session) listRowStartIndex = len(allObjects) allObjects = append(allObjects, eoi) } else if dirName == "" { allObjects = append(allObjects, eoi) } lastName = name } formVersionsListRow(allObjects, listRowStartIndex, session) return allObjects } func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) { if len(objects) == 0 { return } prevVersions := objects[rowStartIndex:] sort.Slice(prevVersions, func(i, j int) bool { return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first }) objects[rowStartIndex].IsLatest = len(session.Next) == 0 || session.Next[0].FilePath != objects[rowStartIndex].NodeVersion.FilePath } func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) { return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true) } func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) { return n.getListVersionsSession(ctx, p, false) } func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) { owner := n.BearerOwner(ctx) cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) session := n.cache.GetListSession(owner, cacheKey) if session == nil { return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } if session.Acquired.Swap(true) { return n.initNewVersionsByPrefixSession(ctx, p, latestOnly) } // after reading next object from stream in session // the current cache value already doesn't match with next token in cache key n.cache.DeleteListSession(owner, cacheKey) return session, nil } func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) { session = &data.ListSession{NamesMap: make(map[string]struct{})} session.Context, session.Cancel = context.WithCancel(context.Background()) if bd, err := middleware.GetBoxData(ctx); err == nil { session.Context = middleware.SetBoxData(session.Context, bd) } session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly) if err != nil { return nil, err } return session, nil } func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.NodeVersion) { if len(allObjects) <= p.MaxKeys { return } var cacheKey cache.ListSessionKey switch p.ListType { case ListObjectsV1Type: cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].FilePath) case ListObjectsV2Type: cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].OID.EncodeToString()) default: // should never happen panic("invalid list type") } session.Acquired.Store(false) session.Next = []*data.NodeVersion{allObjects[p.MaxKeys]} n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) { if len(allObjects) <= p.MaxKeys { return } session.Acquired.Store(false) session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1) session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion for i, node := range allObjects[p.MaxKeys:] { session.Next[i+1] = node.NodeVersion } cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()) n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session) } func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) //existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories existed := stream.NamesMap if len(stream.Next) != 0 { existed[continuationToken] = struct{}{} } limit := p.MaxKeys if len(stream.Next) == 0 { limit++ } go func() { var generated int var err error LOOP: for err == nil { node, err := stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { errCh <- fmt.Errorf("stream next: %w", err) } break LOOP } if shouldSkip(node, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- node: generated++ if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken break LOOP } } } close(nodeCh) close(errCh) }() return nodeCh, errCh } func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) { nodeCh := make(chan *data.NodeVersion, 1000) errCh := make(chan error, 1) existed := stream.NamesMap delete(existed, continuationToken) go func() { var ( generated int ind int err error lastName string ) LOOP: for err == nil { var node *data.NodeVersion if ind < len(stream.Next) { node = stream.Next[ind] ind++ } else { node, err = stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { errCh <- fmt.Errorf("stream next: %w", err) } break LOOP } } if shouldSkipVersions(node, p, existed) { continue } select { case <-ctx.Done(): break LOOP case nodeCh <- node: generated++ if generated > p.MaxKeys && node.FilePath != lastName { break LOOP } lastName = node.FilePath } } close(nodeCh) close(errCh) }() return nodeCh, errCh } func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.NodeVersion, size) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); dirName != "" || node.IsFilledExtra() { // todo think to not compute twice if dirName != "" { node.FilePath = dirName } select { case <-ctx.Done(): case objCh <- node: } } else { // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.NodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { // do not process object which are definitely missing in object service return } } node.FillExtra(oi) select { case <-ctx.Done(): case objCh <- node: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) { reqLog := n.reqLogger(ctx) pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog})) if err != nil { return nil, fmt.Errorf("coudln't init go pool for listing: %w", err) } objCh := make(chan *data.ExtendedNodeVersion) go func() { var wg sync.WaitGroup LOOP: for node := range input { select { case <-ctx.Done(): break LOOP default: } if node.IsFilledExtra() { select { case <-ctx.Done(): case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}: } } else { // We have to make a copy of pointer to data.NodeVersion // to get correct value in submitted task function. func(node *data.NodeVersion) { wg.Add(1) err = pool.Submit(func() { defer wg.Done() oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node) if oi == nil { // try to get object again if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil { // do not process object which are definitely missing in object service return } } node.FillExtra(oi) select { case <-ctx.Done(): case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}: } }) if err != nil { wg.Done() reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) } }(node) } } wg.Wait() close(objCh) pool.Release() }() return objCh, nil } func shouldSkip(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { if node.IsDeleteMarker { return true } filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName } if _, ok := existed[filePath]; ok { return true } if filePath <= p.Marker { return true } if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { if p.Bookmark != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} } } existed[filePath] = struct{}{} return false } func shouldSkipVersions(node *data.NodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool { filePath := node.FilePath if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName if _, ok := existed[filePath]; ok { return true } } if filePath < p.Marker { return true } if p.Bookmark != "" { if _, ok := existed[continuationToken]; !ok { if p.Bookmark != node.OID.EncodeToString() { return true } existed[continuationToken] = struct{}{} return true } } existed[filePath] = struct{}{} return false } func triageObjects(allObjects []*data.NodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.NodeVersion) { objects = make([]*data.NodeVersion, 0, len(allObjects)) for _, ov := range allObjects { if dirName := tryDirectoryName(ov, prefix, delimiter); dirName != "" { prefixes = append(prefixes, dirName) } else { objects = append(objects, ov) } } return } func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.ExtendedNodeVersion) { for _, ov := range allObjects { if dirName := tryDirectoryName(ov.NodeVersion, prefix, delimiter); dirName != "" { prefixes = append(prefixes, dirName) } else { objects = append(objects, ov) } } return } func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) { owner := n.BearerOwner(ctx) if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil { return extInfo.ObjectInfo } meta, err := n.objectHead(ctx, bktInfo, node.OID) if err != nil { n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err)) return nil } oi = objectInfoFromMeta(bktInfo, meta) oi.MD5Sum = node.MD5 n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) return oi } // tryDirectoryName forms directory name by prefix and delimiter. // If node isn't a directory empty string is returned. // This function doesn't check if node has a prefix. It must do a caller. func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { if len(delimiter) == 0 { return "" } tail := strings.TrimPrefix(node.FilePath, prefix) index := strings.Index(tail, delimiter) if index >= 0 { return prefix + tail[:index+1] } return "" } func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) { if p.KeyMarker == "" { return objects, nil } for i, obj := range objects { if obj.NodeVersion.FilePath == p.KeyMarker { for j := i; j < len(objects); j++ { if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath { if p.VersionIDMarker == "" { return objects[j:], nil } break } if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker { return objects[j+1:], nil } } return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } else if obj.NodeVersion.FilePath > p.KeyMarker { if p.VersionIDMarker != "" { return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion) } return objects[i:], nil } } // don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above // that can be empty return []*data.ExtendedNodeVersion{}, nil } func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) { if len(objVersions) == 0 { return nil, nil } var resVersion []*data.ExtendedNodeVersion var resDelMarkVersions []*data.ExtendedNodeVersion for _, version := range objVersions { if version.NodeVersion.IsDeleteMarker { resDelMarkVersions = append(resDelMarkVersions, version) } else { resVersion = append(resVersion, version) } } return resVersion, resDelMarkVersions }