frostfs-s3-gw/api/layer/listing.go

819 lines
21 KiB
Go
Raw Normal View History

package layer
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.NodeVersion
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedNodeVersion
DeleteMarker []*data.ExtendedNodeVersion
VersionIDMarker string
}
allObjectParams struct {
Bucket *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
ContinuationToken string
}
)
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].FilePath
}
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
ContinuationToken: p.ContinuationToken,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.OID.EncodeToString()
}
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
return &result, nil
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var err error
owner := n.BearerOwner(ctx)
versions := make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName == "" {
objVersions = append(objVersions, eoi)
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for _, name := range sortedNames {
sortedVersions := versions[name]
sort.Slice(sortedVersions, func(i, j int) bool {
return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range sortedVersions {
version.IsLatest = i == 0
allObjects = append(allObjects, version)
}
}
//if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
// return nil, err
//}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
}
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects, p.Prefix, p.Delimiter)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
session.Next = []*data.NodeVersion{allObjects[p.MaxKeys-1].NodeVersion, allObjects[p.MaxKeys].NodeVersion}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, res.NextVersionIDMarker), session)
allObjects = allObjects[:p.MaxKeys]
}
res.Version, res.DeleteMarker = triageVersions(allObjects)
return res, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, p.ContinuationToken) // todo redo for listv1
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.Bucket, p.Prefix)
if err != nil {
return nil, nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
generator, errorCh := nodesGeneratorStream(poolCtx, p, session)
objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator)
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.NodeVersion, 0, p.MaxKeys+1)
objects = append(objects, session.Next...)
for obj := range objOutCh {
objects = append(objects, obj)
}
if err = <-errorCh; err != nil {
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
// probably isn't necessary
sort.Slice(objects, func(i, j int) bool {
return objects[i].FilePath < objects[j].FilePath
})
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
objects = objects[:p.MaxKeys]
}
if next != nil {
session.Next = []*data.NodeVersion{next}
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session)
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (versions map[string][]*data.ExtendedNodeVersion, err error) {
owner := n.BearerOwner(ctx)
versions = make(map[string][]*data.ExtendedNodeVersion, p.MaxKeys)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.VersionIDMarker)
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
pp := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
Marker: p.KeyMarker,
ContinuationToken: p.VersionIDMarker,
MaxKeys: p.MaxKeys,
}
generator, errorCh := nodesGeneratorVersions(poolCtx, pp, session)
objOutCh, err := n.initWorkerPoolVersions(poolCtx, 2, pp, generator)
if err != nil {
return nil, err
}
//if session.Next != nil {
// name := session.Next.FilePath
// dirName := tryDirectoryName(session.Next, p.Prefix, p.Delimiter)
// if dirName != "" {
// name = dirName
// }
//
// versions[name] = []*data.ExtendedNodeVersion{{NodeVersion: session.Next}}
//}
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
objVersions, ok := versions[name]
if !ok {
objVersions = []*data.ExtendedNodeVersion{eoi}
} else if dirName == "" {
objVersions = append(objVersions, eoi)
}
versions[name] = objVersions
}
if err = <-errorCh; err != nil {
return nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
return versions, nil
}
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
if len(stream.Next) != 0 {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if len(stream.Next) == 0 {
limit++
}
go func() {
var generated int
var err error
LOOP:
for err == nil {
node, err := stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
delete(existed, continuationToken)
//if len(stream.Next) != 0 {
// existed[continuationToken] = struct{}{}
//}
limit := p.MaxKeys + 1
//if len(stream.Next) == 0 {
// limit++
//}
go func() {
var generated int
var err error
ind := 0
LOOP:
for err == nil {
var node *data.NodeVersion
if ind < len(stream.Next) {
node = stream.Next[ind]
ind++
} else {
node, err = stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
}
if shouldSkipVersions(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.NodeVersion, size)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if node.IsFilledExtra() || tryDirectoryName(node, p.Prefix, p.Delimiter) != "" { // todo think to not compute twice
select {
case <-ctx.Done():
case objCh <- node:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
node.FillExtra(oi)
select {
case <-ctx.Done():
case objCh <- node:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ExtendedNodeVersion)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if node.IsFilledExtra() {
select {
case <-ctx.Done():
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
node.FillExtra(oi)
select {
case <-ctx.Done():
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
var err error
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix)
if err != nil {
return nil, fmt.Errorf("get all versions from tree service: %w", err)
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
return nodeVersions, nil
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker {
return true
}
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.ContinuationToken != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
if _, ok := existed[filePath]; ok {
return true
}
}
if filePath < p.Marker {
return true
}
if p.ContinuationToken != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
return true
}
}
existed[filePath] = struct{}{}
return false
}
func triageObjects(allObjects []*data.NodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.NodeVersion) {
objects = make([]*data.NodeVersion, 0, len(allObjects))
for _, ov := range allObjects {
if dirName := tryDirectoryName(ov, prefix, delimiter); dirName != "" {
prefixes = append(prefixes, dirName)
} else {
objects = append(objects, ov)
}
}
return
}
func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.ExtendedNodeVersion) {
for _, ov := range allObjects {
if dirName := tryDirectoryName(ov.NodeVersion, prefix, delimiter); dirName != "" {
prefixes = append(prefixes, dirName)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.NodeVersion.FilePath == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.NodeVersion.FilePath > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedNodeVersion{}, nil
}
func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedNodeVersion
var resDelMarkVersions []*data.ExtendedNodeVersion
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}