frostfs-s3-gw/api/layer/listing.go
Denis Kirillov 6d52f46012 [#165] Fix v1 listing bookmark
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-02 16:09:48 +03:00

746 lines
19 KiB
Go

package layer
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.NodeVersion
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedNodeVersion
DeleteMarker []*data.ExtendedNodeVersion
VersionIDMarker string
}
allObjectListingParams struct {
BktInfo *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
Bookmark string
VersionAPi string
}
)
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
Bookmark: p.Marker,
VersionAPi: "v1",
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].FilePath
}
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
Bookmark: p.ContinuationToken,
VersionAPi: "v2",
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.OID.EncodeToString()
}
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
return &result, nil
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
prm := allObjectListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.KeyMarker,
Bookmark: p.VersionIDMarker,
VersionAPi: "vs",
}
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
IsTruncated: isTruncated,
}
if res.IsTruncated {
res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
}
res.CommonPrefixes, objects = triageExtendedObjects(objects, p.Prefix, p.Delimiter)
res.Version, res.DeleteMarker = triageVersions(objects)
return res, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectListingParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark) // todo redo for listv1
session := n.cache.GetListSession(owner, cacheKey)
if session != nil {
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
} else {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetLatestVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, nil, err
}
}
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
generator, errorCh := nodesGeneratorStream(poolCtx, p, session)
objOutCh, err := n.initWorkerPoolStream(poolCtx, 2, p, generator)
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.NodeVersion, 0, p.MaxKeys+1)
objects = append(objects, session.Next...)
for obj := range objOutCh {
objects = append(objects, obj)
}
if err = <-errorCh; err != nil {
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
objects = objects[:p.MaxKeys]
}
if next != nil {
session.Next = []*data.NodeVersion{next}
if p.VersionAPi == "v1" {
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, objects[len(objects)-1].FilePath), session)
} else {
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, next.OID.EncodeToString()), session)
}
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p allObjectListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
if p.MaxKeys == 0 {
return nil, false, nil
}
session, err := n.getListVersionsSession(ctx, p)
if err != nil {
return nil, false, err
}
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
objOutCh, err := n.initWorkerPoolVersions(ctx, 2, p, generator)
if err != nil {
return nil, false, err
}
var lastName string
groupedVersions := make([][]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter)
if dirName != "" {
name = dirName
}
if lastName != name {
groupedVersions = append(groupedVersions, []*data.ExtendedNodeVersion{eoi})
} else if dirName == "" {
groupedVersions[len(groupedVersions)-1] = append(groupedVersions[len(groupedVersions)-1], eoi)
}
lastName = name
}
if err = <-errorCh; err != nil {
return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
}
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for _, versions := range groupedVersions {
sort.Slice(versions, func(i, j int) bool {
return versions[j].NodeVersion.Timestamp < versions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range versions {
version.IsLatest = i == 0 && (session.Next == nil || session.Next[0].FilePath != versions[0].NodeVersion.FilePath)
allObjects = append(allObjects, version)
}
}
var isTruncated bool
if len(allObjects) > p.MaxKeys {
isTruncated = true
session.Next = make([]*data.NodeVersion, len(allObjects)-p.MaxKeys+1)
session.Next[0] = allObjects[p.MaxKeys-1].NodeVersion
for i, node := range allObjects[p.MaxKeys:] {
session.Next[i+1] = node.NodeVersion
}
session.Acquired.Store(false)
n.cache.PutListSession(n.BearerOwner(ctx), cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].OID.EncodeToString()), session)
allObjects = allObjects[:p.MaxKeys]
}
return allObjects, isTruncated, nil
}
func (n *layer) getListVersionsSession(ctx context.Context, p allObjectListingParams) (*data.ListSession, error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey)
if session == nil {
return n.initNewListVersionsSession(ctx, p)
}
if session.Acquired.Swap(true) {
return n.initNewListVersionsSession(ctx, p)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
return session, nil
}
func (n *layer) initNewListVersionsSession(ctx context.Context, p allObjectListingParams) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.GetAllVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix)
if err != nil {
return nil, err
}
return session, nil
}
func nodesGeneratorStream(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
existed := stream.NamesMap
if len(stream.Next) != 0 {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if len(stream.Next) == 0 {
limit++
}
go func() {
var generated int
var err error
LOOP:
for err == nil {
node, err := stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p allObjectListingParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
nodeCh := make(chan *data.NodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
delete(existed, continuationToken)
go func() {
var (
generated int
ind int
err error
lastName string
)
LOOP:
for err == nil {
var node *data.NodeVersion
if ind < len(stream.Next) {
node = stream.Next[ind]
ind++
} else {
node, err = stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
}
if shouldSkipVersions(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated > p.MaxKeys && node.FilePath != lastName {
break LOOP
}
lastName = node.FilePath
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.NodeVersion, size)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); dirName != "" || node.IsFilledExtra() { // todo think to not compute twice
if dirName != "" {
node.FilePath = dirName
}
select {
case <-ctx.Done():
case objCh <- node:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
node.FillExtra(oi)
select {
case <-ctx.Done():
case objCh <- node:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectListingParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ExtendedNodeVersion)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if node.IsFilledExtra() {
select {
case <-ctx.Done():
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
node.FillExtra(oi)
select {
case <-ctx.Done():
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func shouldSkip(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker {
return true
}
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func shouldSkipVersions(node *data.NodeVersion, p allObjectListingParams, existed map[string]struct{}) bool {
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
if _, ok := existed[filePath]; ok {
return true
}
}
if filePath < p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
return true
}
}
existed[filePath] = struct{}{}
return false
}
func triageObjects(allObjects []*data.NodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.NodeVersion) {
objects = make([]*data.NodeVersion, 0, len(allObjects))
for _, ov := range allObjects {
if dirName := tryDirectoryName(ov, prefix, delimiter); dirName != "" {
prefixes = append(prefixes, dirName)
} else {
objects = append(objects, ov)
}
}
return
}
func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.ExtendedNodeVersion) {
for _, ov := range allObjects {
if dirName := tryDirectoryName(ov.NodeVersion, prefix, delimiter); dirName != "" {
prefixes = append(prefixes, dirName)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.NodeVersion.FilePath == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.NodeVersion.FilePath > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedNodeVersion{}, nil
}
func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedNodeVersion
var resDelMarkVersions []*data.ExtendedNodeVersion
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}