feature/165-speed_up_listing #294

Merged
alexvanin merged 22 commits from dkirillov/frostfs-s3-gw:feature/165-speed_up_listing into master 2024-09-04 19:51:13 +00:00
32 changed files with 1977 additions and 892 deletions

107
api/cache/listsession.go vendored Normal file
View file

@ -0,0 +1,107 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// ListSessionCache contains cache for list session (during pagination).
ListSessionCache struct {
cache gcache.Cache
logger *zap.Logger
}
// ListSessionKey is a key to find a ListSessionCache's entry.
ListSessionKey struct {
cid cid.ID
prefix string
token string
}
)
const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
DefaultListSessionCacheLifetime = time.Second * 60
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100
)
// DefaultListSessionConfig returns new default cache expiration values.
func DefaultListSessionConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultListSessionCacheSize,
Lifetime: DefaultListSessionCacheLifetime,
Logger: logger,
}
}
func (k *ListSessionKey) String() string {
return k.cid.EncodeToString() + k.prefix + k.token
}
// NewListSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
func NewListSessionCache(config *Config) *ListSessionCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(key interface{}, val interface{}) {
session, ok := val.(*data.ListSession)
if !ok {
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
zap.String("expected", fmt.Sprintf("%T", session)))
}
if !session.Acquired.Load() {
session.Cancel()
}
}).Build()
return &ListSessionCache{cache: gc, logger: config.Logger}
}
// GetListSession returns a list of ObjectInfo.
func (l *ListSessionCache) GetListSession(key ListSessionKey) *data.ListSession {
entry, err := l.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.ListSession)
if !ok {
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// PutListSession puts a list of object versions to cache.
func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error {
s := l.GetListSession(key)
if s != nil && s != session {
if !s.Acquired.Load() {
s.Cancel()
}
}
return l.cache.Set(key, session)
}
// DeleteListSession removes key from cache.
func (l *ListSessionCache) DeleteListSession(key ListSessionKey) {
l.cache.Remove(key)
}
// CreateListSessionCacheKey returns ListSessionKey with the given CID, prefix and token.
func CreateListSessionCacheKey(cnr cid.ID, prefix, token string) ListSessionKey {
p := ListSessionKey{
cid: cnr,
prefix: prefix,
token: token,
}
return p
}

View file

@ -37,8 +37,6 @@ type (
ObjectInfo struct {
ID oid.ID
CID cid.ID
IsDir bool
IsDeleteMarker bool
Bucket string
Name string

19
api/data/listsession.go Normal file
View file

@ -0,0 +1,19 @@
package data
import (
"context"
"sync/atomic"
)
type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error)
}
type ListSession struct {
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context
Cancel context.CancelFunc
Acquired atomic.Bool
}

View file

@ -16,20 +16,31 @@ const (
// NodeVersion represent node from tree service.
type NodeVersion struct {
BaseNodeVersion
DeleteMarker *DeleteMarkerInfo
IsUnversioned bool
IsCombined bool
}
func (v NodeVersion) IsDeleteMarker() bool {
return v.DeleteMarker != nil
// ExtendedNodeVersion contains additional node info to be able to sort versions by timestamp.
type ExtendedNodeVersion struct {
NodeVersion *NodeVersion
IsLatest bool
DirName string
}
// DeleteMarkerInfo is used to save object info if node in the tree service is delete marker.
// We need this information because the "delete marker" object is no longer stored in FrostFS.
type DeleteMarkerInfo struct {
Created time.Time
Owner user.ID
func (e ExtendedNodeVersion) Version() string {
if e.NodeVersion.IsUnversioned {
return UnversionedObjectVersionID
}
return e.NodeVersion.OID.EncodeToString()
}
func (e ExtendedNodeVersion) Name() string {
if e.DirName != "" {
return e.DirName
}
return e.NodeVersion.FilePath
}
// ExtendedObjectInfo contains additional node info to be able to sort versions by timestamp.
@ -58,6 +69,27 @@ type BaseNodeVersion struct {
ETag string
MD5 string
FilePath string
Created *time.Time
Owner *user.ID
IsDeleteMarker bool
}
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
if md5Enabled && len(v.MD5) > 0 {
return v.MD5
}
return v.ETag
}
// IsFilledExtra returns true is node was created by version of gate v0.29.x and later.
func (v BaseNodeVersion) IsFilledExtra() bool {
return v.Created != nil && v.Owner != nil
}
func (v *BaseNodeVersion) FillExtra(owner *user.ID, created *time.Time, realSize uint64) {
v.Owner = owner
v.Created = created
v.Size = realSize
}
type ObjectTaggingInfo struct {

View file

@ -137,7 +137,7 @@ func writeAttributesHeaders(h http.Header, info *data.ExtendedObjectInfo, isBuck
h.Set(api.AmzVersionID, info.Version())
}
if info.NodeVersion.IsDeleteMarker() {
if info.NodeVersion.IsDeleteMarker {
h.Set(api.AmzDeleteMarker, strconv.FormatBool(true))
}

View file

@ -243,6 +243,7 @@ func TestDeleteMarkerVersioned(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
@ -433,17 +434,20 @@ func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.B
return bktInfo, objInfo
}
func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
createTestBucket(tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
require.NoError(t, err)
putBucketVersioning(t, tc, bktName, true)
func createVersionedBucketAndObject(_ *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
bktInfo := createVersionedBucket(tc, bktName)
objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{})
return bktInfo, objInfo
}
func createVersionedBucket(hc *handlerContext, bktName string) *data.BucketInfo {
bktInfo := createTestBucket(hc, bktName)
putBucketVersioning(hc.t, hc, bktName, true)
return bktInfo
}
func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabled bool) {
cfg := &VersioningConfiguration{Status: "Suspended"}
if enabled {

View file

@ -210,11 +210,12 @@ func TestGetObjectEnabledMD5(t *testing.T) {
require.Equal(t, data.Quote(objInfo.MD5Sum), headers.Get(api.ETag))
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
func putObjectContent(hc *handlerContext, bktName, objName, content string) http.Header {
body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
hc.Handler().PutObjectHandler(w, r)
assertStatus(hc.t, w, http.StatusOK)
return w.Result().Header
}
func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, start, end int) []byte {

View file

@ -45,6 +45,8 @@ type handlerContext struct {
config *configMock
layerFeatures *layer.FeatureSettingsMock
treeMock *tree.ServiceClientMemory
cache *layer.Cache
}
func (hc *handlerContext) Handler() *handler {
@ -125,14 +127,14 @@ func (c *configMock) ResolveNamespaceAlias(ns string) string {
}
func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false)
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
}
func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, true)
return prepareHandlerContextBase(t, getMinCacheConfig(zap.NewExample()))
}
func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
func prepareHandlerContextBase(t *testing.T, cacheCfg *layer.CachesConfig) *handlerContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
@ -147,21 +149,20 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
cacheCfg := layer.DefaultCachesConfigs(l)
if minCache {
cacheCfg = getMinCacheConfig(l)
}
treeMock := tree.NewTree(memCli, zap.NewExample())
features := &layer.FeatureSettingsMock{}
layerCfg := &layer.Config{
Caches: cacheCfg,
Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: treeMock,
Features: features,
GateOwner: owner,
}
var pp netmap.PlacementPolicy
@ -188,6 +189,8 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
config: cfg,
layerFeatures: features,
treeMock: memCli,
cache: layerCfg.Cache,
}
}
@ -201,6 +204,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Logger: logger,
Objects: minCacheCfg,
ObjectsList: minCacheCfg,
SessionList: minCacheCfg,
Names: minCacheCfg,
Buckets: minCacheCfg,
System: minCacheCfg,
@ -262,12 +266,6 @@ func (a *apeMock) DeletePolicy(namespace string, cnrID cid.ID) error {
return nil
}
func NewTreeServiceMock(t *testing.T) *tree.Tree {
memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
return tree.NewTree(memCli, zap.NewExample())
}
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
_, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
Creator: hc.owner,

View file

@ -185,29 +185,26 @@ func fillPrefixes(src []string, encode string) []CommonPrefix {
return dst
}
func fillContentsWithOwner(src []*data.ObjectInfo, encode string, md5Enabled bool) []Object {
func fillContentsWithOwner(src []*data.ExtendedNodeVersion, encode string, md5Enabled bool) []Object {
return fillContents(src, encode, true, md5Enabled)
}
func fillContents(src []*data.ObjectInfo, encode string, fetchOwner, md5Enabled bool) []Object {
func fillContents(src []*data.ExtendedNodeVersion, encode string, fetchOwner, md5Enabled bool) []Object {
var dst []Object
for _, obj := range src {
res := Object{
Key: s3PathEncode(obj.Name, encode),
Size: obj.Size,
LastModified: obj.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(obj.ETag(md5Enabled)),
Key: s3PathEncode(obj.NodeVersion.FilePath, encode),
Size: obj.NodeVersion.Size,
LastModified: obj.NodeVersion.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(obj.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
}
if size, err := layer.GetObjectSize(obj); err == nil {
res.Size = size
}
if fetchOwner {
owner := obj.NodeVersion.Owner.String()
res.Owner = &Owner{
ID: obj.Owner.String(),
DisplayName: obj.Owner.String(),
ID: owner,
DisplayName: owner,
}
}
@ -284,15 +281,15 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
for _, ver := range info.Version {
res.Version = append(res.Version, ObjectVersionResponse{
IsLatest: ver.IsLatest,
Key: ver.ObjectInfo.Name,
LastModified: ver.ObjectInfo.Created.UTC().Format(time.RFC3339),
Key: ver.NodeVersion.FilePath,
LastModified: ver.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: ver.ObjectInfo.Owner.String(),
DisplayName: ver.ObjectInfo.Owner.String(),
ID: ver.NodeVersion.Owner.String(),
DisplayName: ver.NodeVersion.Owner.String(),
},
Size: ver.ObjectInfo.Size,
Size: ver.NodeVersion.Size,
VersionID: ver.Version(),
ETag: data.Quote(ver.ObjectInfo.ETag(md5Enabled)),
ETag: data.Quote(ver.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
})
}
@ -300,11 +297,11 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
for _, del := range info.DeleteMarker {
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
IsLatest: del.IsLatest,
Key: del.ObjectInfo.Name,
LastModified: del.ObjectInfo.Created.UTC().Format(time.RFC3339),
Key: del.NodeVersion.FilePath,
LastModified: del.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: del.ObjectInfo.Owner.String(),
DisplayName: del.ObjectInfo.Owner.String(),
ID: del.NodeVersion.Owner.String(),
DisplayName: del.NodeVersion.Owner.String(),
},
VersionID: del.Version(),
})

View file

@ -1,15 +1,22 @@
package handler
import (
"context"
"fmt"
"net/http"
"net/url"
"sort"
"strconv"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestParseContinuationToken(t *testing.T) {
@ -58,13 +65,164 @@ func TestListObjectNullVersions(t *testing.T) {
require.Equal(t, data.UnversionedObjectVersionID, result.Version[1].VersionID)
}
func TestListObjectsPaging(t *testing.T) {
func TestListObjectsWithOldTreeNodes(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket-versioning-enabled", "object"
bktInfo := createTestBucket(hc, bktName)
srcEnc, err := encryption.NewParams([]byte("1234567890qwertyuiopasdfghjklzxc"))
require.NoError(t, err)
n := 10
objInfos := make([]*data.ObjectInfo, n)
for i := 0; i < n; i++ {
objInfos[i] = createTestObject(hc, bktInfo, objName+strconv.Itoa(i), *srcEnc)
}
sort.Slice(objInfos, func(i, j int) bool { return objInfos[i].Name < objInfos[j].Name })
makeAllTreeObjectsOld(hc, bktInfo)
listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
checkListOldNodes(hc, listV1.Contents, objInfos)
listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
checkListOldNodes(hc, listV2.Contents, objInfos)
listVers := listObjectsVersions(hc, bktName, "", "", "", "", -1)
checkListVersionsOldNodes(hc, listVers.Version, objInfos)
}
func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", 0, 0)
require.NoError(hc.t, err)
for _, node := range nodes {
if node.GetNodeID() == 0 {
continue
}
meta := make(map[string]string, len(node.GetMeta()))
for _, m := range node.GetMeta() {
if m.GetKey() != "Created" && m.GetKey() != "Owner" {
meta[m.GetKey()] = string(m.GetValue())
}
}
err = hc.treeMock.MoveNode(hc.Context(), bktInfo, "version", node.GetNodeID(), node.GetParentID(), meta)
require.NoError(hc.t, err)
}
}
func checkListOldNodes(hc *handlerContext, list []Object, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
func checkListVersionsOldNodes(hc *handlerContext, list []ObjectVersionResponse, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
func TestListObjectsContextCanceled(t *testing.T) {
layerCfg := layer.DefaultCachesConfigs(zaptest.NewLogger(t))
layerCfg.SessionList.Lifetime = time.Hour
layerCfg.SessionList.Size = 1
hc := prepareHandlerContextBase(t, layerCfg)
bktName := "bucket-versioning-enabled"
bktInfo := createTestBucket(hc, bktName)
for i := 0; i < 4; i++ {
putObject(hc, bktName, "object"+strconv.Itoa(i))
}
result := listObjectsV1(hc, bktName, "", "", "", 2)
session := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result.NextMarker))
// invoke list again to trigger cache eviction
// (use empty prefix to check that context canceled on replace)
listObjectsV1(hc, bktName, "", "", "", 2)
checkContextCanceled(session.Context, t)
result2 := listObjectsV2(hc, bktName, "", "", "", "", 2)
session2 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result2.NextContinuationToken))
// invoke list again to trigger cache eviction
// (use non-empty prefix to check that context canceled on cache eviction)
listObjectsV2(hc, bktName, "o", "", "", "", 2)
checkContextCanceled(session2.Context, t)
result3 := listObjectsVersions(hc, bktName, "", "", "", "", 2)
session3 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result3.NextVersionIDMarker))
// invoke list again to trigger cache eviction
listObjectsVersions(hc, bktName, "o", "", "", "", 2)
checkContextCanceled(session3.Context, t)
}
func checkContextCanceled(ctx context.Context, t *testing.T) {
select {
case <-ctx.Done():
case <-time.After(10 * time.Second):
}
require.ErrorIs(t, ctx.Err(), context.Canceled)
}
func TestListObjectsLatestVersions(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createTestBucket(hc, bktName)
putBucketVersioning(t, hc, bktName, true)
objName1, objName2 := "object1", "object2"
objContent1, objContent2 := "content1", "content2"
putObjectContent(hc, bktName, objName1, objContent1)
hdr1 := putObjectContent(hc, bktName, objName1, objContent2)
putObjectContent(hc, bktName, objName2, objContent1)
hdr2 := putObjectContent(hc, bktName, objName2, objContent2)
t.Run("listv1", func(t *testing.T) {
result := listObjectsV1(hc, bktName, "", "", "", -1)
require.Len(t, result.Contents, 2)
require.Equal(t, objName1, result.Contents[0].Key)
require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
require.Equal(t, objName2, result.Contents[1].Key)
require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
})
t.Run("listv2", func(t *testing.T) {
result := listObjectsV2(hc, bktName, "", "", "", "", -1)
require.Len(t, result.Contents, 2)
require.Equal(t, objName1, result.Contents[0].Key)
require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
require.Equal(t, objName2, result.Contents[1].Key)
require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
})
}
func TestListObjectsVersionsPaging(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createTestBucket(hc, bktName)
n := 10
n := 12
var objects []string
for i := 0; i < n; i++ {
@ -88,6 +246,65 @@ func TestListObjectsPaging(t *testing.T) {
require.Empty(t, objects)
}
func TestListObjectsVersionsCorrectIsLatestFlag(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createVersionedBucket(hc, bktName)
objName1, objName2 := "obj1", "obj2"
n := 9
listSize := 3
headers := make([]http.Header, n)
// objects uploaded: ["obj1"-v1, "obj1"-v2, "obj1"-v3, "obj2"-v1, "obj2"-v2, "obj2"-v3, "obj2"-v4, "obj2"-v5, "obj2"-v6]
for i := 0; i < n; i++ {
objName := objName1
if i >= listSize {
objName = objName2
}
headers[i] = putObjectContent(hc, bktName, objName, fmt.Sprintf("content/%d", i))
}
versions := listObjectsVersions(hc, bktName, "", "", "", "", listSize)
// expected objects: ["obj1"-v3, "obj1"-v2, "obj1"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName1, headers[:listSize], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v6, "obj2"-v5, "obj2"-v4]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[2*listSize:], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v3, "obj2"-v2, "obj2"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[listSize:2*listSize], false))
}
func formReverseVersionResponse(objName string, headers []http.Header, isLatest bool) []ObjectVersionResponse {
res := make([]ObjectVersionResponse, len(headers))
for i, h := range headers {
ind := len(headers) - 1 - i
res[ind] = ObjectVersionResponse{
ETag: h.Get(api.ETag),
IsLatest: isLatest && ind == 0,
Key: objName,
VersionID: h.Get(api.AmzVersionID),
}
}
return res
}
func checkListVersionsParts(t *testing.T, versions *ListObjectsVersionsResponse, expected []ObjectVersionResponse) {
require.Len(t, versions.Version, len(expected))
for i, res := range versions.Version {
require.Equal(t, expected[i].Key, res.Key)
require.Equal(t, expected[i].ETag, res.ETag)
require.Equal(t, expected[i].VersionID, res.VersionID)
require.Equal(t, expected[i].IsLatest, res.IsLatest)
}
}
func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T) {
tc := prepareHandlerContext(t)
@ -162,6 +379,132 @@ func TestS3BucketListDelimiterBasic(t *testing.T) {
require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix)
}
func TestS3BucketListEmpty(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
versions := listObjectsVersions(hc, bktName, "", "", "", "", -1)
require.Empty(t, versions.Version)
require.Empty(t, versions.DeleteMarker)
require.Empty(t, versions.CommonPrefixes)
}
func TestS3BucketListV2PrefixAlt(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"bar", "baz", "foo"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "ba", "", "", "", -1)
require.Equal(t, "ba", response.Prefix)
require.Len(t, response.Contents, 2)
require.Equal(t, "bar", response.Contents[0].Key)
require.Equal(t, "baz", response.Contents[1].Key)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"foo/bar", "foo/baz", "quux"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "d", "", "", "", -1)
require.Equal(t, "d", response.Prefix)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixUnreadable(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"foo/bar", "foo/baz", "quux"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "\x0a", "", "", "", -1)
require.Equal(t, "\x0a", response.Prefix)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixDelimiterAlt(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"bar", "bazar", "cab", "foo"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "ba", "a", "", "", -1)
require.Equal(t, "ba", response.Prefix)
require.Equal(t, "a", response.Delimiter)
require.Len(t, response.Contents, 1)
require.Equal(t, "bar", response.Contents[0].Key)
require.Len(t, response.CommonPrefixes, 1)
require.Equal(t, "baza", response.CommonPrefixes[0].Prefix)
}
func TestS3BucketListV2PrefixDelimiterDelimiterNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "b", "z", "", "", -1)
require.Len(t, response.Contents, 3)
require.Equal(t, "b/a/c", response.Contents[0].Key)
require.Equal(t, "b/a/g", response.Contents[1].Key)
require.Equal(t, "b/a/r", response.Contents[2].Key)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixDelimiterPrefixDelimiterNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "y", "z", "", "", -1)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
tc := prepareHandlerContext(t)
@ -182,6 +525,35 @@ func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
require.Equal(t, "c%", listV2Response.CommonPrefixes[1].Prefix)
}
func TestS3BucketListDelimiterPrefix(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"asdf", "boo/bar", "boo/baz/xyzzy", "cquux/thud", "cquux/bla"}
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
var empty []string
delim := "/"
prefix := ""
marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"asdf"}, empty, "asdf")
marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"boo/"}, "boo/")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"cquux/"}, "")
marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"asdf"}, []string{"boo/"}, "boo/")
validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"cquux/"}, "")
prefix = "boo/"
marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"boo/bar"}, empty, "boo/bar")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"boo/baz/"}, "")
validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"boo/bar"}, []string{"boo/baz/"}, "")
}
func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
tc := prepareHandlerContext(t)
@ -211,6 +583,65 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
}
func TestS3BucketListDelimiterPrefixUnderscore(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"_obj1_", "_under1/bar", "_under1/baz/xyzzy", "_under2/thud", "_under2/bla"}
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
var empty []string
delim := "/"
prefix := ""
marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_obj1_"}, empty, "_obj1_")
marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"_under1/"}, "_under1/")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under2/"}, "")
marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"_obj1_"}, []string{"_under1/"}, "_under1/")
validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"_under2/"}, "")
prefix = "_under1/"
marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_under1/bar"}, empty, "_under1/bar")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under1/baz/"}, "")
validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"_under1/bar"}, []string{"_under1/baz/"}, "")
}
func TestS3BucketListDelimiterNotSkipSpecial(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"0/"}
for i := 1000; i < 1999; i++ {
objects = append(objects, fmt.Sprintf("0/%d", i))
}
objects2 := []string{"1999", "1999#", "1999+", "2000"}
objects = append(objects, objects2...)
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
delimiter := "/"
list := listObjectsV1(hc, bktName, "", delimiter, "", -1)
require.Equal(t, delimiter, list.Delimiter)
require.Equal(t, []CommonPrefix{{Prefix: "0/"}}, list.CommonPrefixes)
require.Len(t, list.Contents, len(objects2))
for i := 0; i < len(list.Contents); i++ {
require.Equal(t, objects2[i], list.Contents[i].Key)
}
}
func TestMintVersioningListObjectVersionsVersionIDContinuation(t *testing.T) {
hc := prepareHandlerContext(t)
@ -251,13 +682,21 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam
}
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys)
}
func listObjectsV2Ext(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken, encodingType string, maxKeys int) *ListObjectsV2Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
query.Add("fetch-owner", "true")
if len(startAfter) != 0 {
query.Add("start-after", startAfter)
}
if len(continuationToken) != 0 {
query.Add("continuation-token", continuationToken)
}
if len(encodingType) != 0 {
query.Add("encoding-type", encodingType)
}
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
hc.Handler().ListObjectsV2Handler(w, r)
@ -267,6 +706,26 @@ func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, c
return res
}
func validateListV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int,
isTruncated bool, checkObjects, checkPrefixes []string, nextMarker string) string {
response := listObjectsV1(tc, bktName, prefix, delimiter, marker, maxKeys)
require.Equal(t, isTruncated, response.IsTruncated)
require.Equal(t, nextMarker, response.NextMarker)
require.Len(t, response.Contents, len(checkObjects))
for i := 0; i < len(checkObjects); i++ {
require.Equal(t, checkObjects[i], response.Contents[i].Key)
}
require.Len(t, response.CommonPrefixes, len(checkPrefixes))
for i := 0; i < len(checkPrefixes); i++ {
require.Equal(t, checkPrefixes[i], response.CommonPrefixes[i].Prefix)
}
return response.NextMarker
}
func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
isTruncated, last bool, checkObjects, checkPrefixes []string) string {
response := listObjectsV2(tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)

View file

@ -13,6 +13,7 @@ import (
type Cache struct {
logger *zap.Logger
listsCache *cache.ObjectsListCache
sessionListCache *cache.ListSessionCache
objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache
bucketCache *cache.BucketCache
@ -25,6 +26,7 @@ type CachesConfig struct {
Logger *zap.Logger
Objects *cache.Config
ObjectsList *cache.Config
SessionList *cache.Config
Names *cache.Config
Buckets *cache.Config
System *cache.Config
@ -37,6 +39,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
Logger: logger,
Objects: cache.DefaultObjectsConfig(logger),
ObjectsList: cache.DefaultObjectsListConfig(logger),
SessionList: cache.DefaultListSessionConfig(logger),
Names: cache.DefaultObjectsNameConfig(logger),
Buckets: cache.DefaultBucketConfig(logger),
System: cache.DefaultSystemConfig(logger),
@ -48,6 +51,7 @@ func NewCache(cfg *CachesConfig) *Cache {
return &Cache{
logger: cfg.Logger,
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
sessionListCache: cache.NewListSessionCache(cfg.SessionList),
objCache: cache.New(cfg.Objects),
namesCache: cache.NewObjectsNameCache(cfg.Names),
bucketCache: cache.NewBucketCache(cfg.Buckets),
@ -144,6 +148,29 @@ func (c *Cache) PutList(owner user.ID, key cache.ObjectsListKey, list []*data.No
}
}
func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.ListSession {
if !c.accessCache.Get(owner, key.String()) {
return nil
}
return c.sessionListCache.GetListSession(key)
}
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
}
if err := c.accessCache.Put(owner, key.String()); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}
func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.sessionListCache.DeleteListSession(key)
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) {
return nil

View file

@ -76,7 +76,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
if zone != info.Zone {
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched", zone, info.Zone)
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, idCnr)
}
n.cache.PutBucket(info)

View file

@ -69,7 +69,7 @@ type (
Config struct {
GateOwner user.ID
ChainAddress string
Caches *CachesConfig
Cache *Cache
AnonKey AnonymousKey
Resolver BucketResolver
TreeService TreeService
@ -323,7 +323,7 @@ func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
gateOwner: config.GateOwner,
anonKey: config.AnonKey,
resolver: config.Resolver,
cache: NewCache(config.Caches),
cache: config.Cache,
treeService: config.TreeService,
features: config.Features,
}
@ -651,7 +651,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
var nullVersionToDelete *data.NodeVersion
if lastVersion.IsUnversioned {
if !lastVersion.IsDeleteMarker() {
if !lastVersion.IsDeleteMarker {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
@ -667,7 +667,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
}
if lastVersion.IsDeleteMarker() {
if lastVersion.IsDeleteMarker {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
@ -679,15 +679,14 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
obj.DeleteMarkVersion = randOID.EncodeToString()
now := TimeNow(ctx)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: TimeNow(ctx),
Owner: n.gateOwner,
Created: &now,
Owner: &n.gateOwner,
IsDeleteMarker: true,
},
IsUnversioned: settings.VersioningSuspended(),
}
@ -712,24 +711,15 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
}
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
if client.IsErrObjectAlreadyRemoved(obj.Error) {
n.reqLogger(ctx).Debug(logs.ObjectAlreadyRemoved,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
if obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj
}
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
}
if client.IsErrObjectNotFound(obj.Error) {
n.reqLogger(ctx).Debug(logs.ObjectNotFound,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
obj.Error = nil
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
if obj.Error == nil {
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
}
@ -764,7 +754,7 @@ func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() {
if nodeVersion.IsDeleteMarker {
return obj.VersionID, nil
}
@ -810,11 +800,15 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}
func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
nodeVersions, err := n.getAllObjectsVersions(ctx, p.BktInfo, "", "")
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
BktInfo: p.BktInfo,
MaxKeys: 1,
})
if err != nil {
return err
}
if len(nodeVersions) != 0 {
if len(res) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}

725
api/layer/listing.go Normal file
View file

@ -0,0 +1,725 @@
package layer
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.ExtendedNodeVersion
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedNodeVersion
DeleteMarker []*data.ExtendedNodeVersion
VersionIDMarker string
}
commonVersionsListingParams struct {
BktInfo *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
Bookmark string
}
commonLatestVersionsListingParams struct {
commonVersionsListingParams
ListType ListType
}
)
type ListType int
const (
ListObjectsV1Type ListType = iota + 1
ListObjectsV2Type ListType = iota + 1
)
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := commonLatestVersionsListingParams{
commonVersionsListingParams: commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
Bookmark: p.Marker,
},
ListType: ListObjectsV1Type,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].Name()
}
result.Prefixes, result.Objects = triageExtendedObjects(objects)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := commonLatestVersionsListingParams{
commonVersionsListingParams: commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
Bookmark: p.ContinuationToken,
},
ListType: ListObjectsV2Type,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.NodeVersion.OID.EncodeToString()
}
result.Prefixes, result.Objects = triageExtendedObjects(objects)
return &result, nil
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
prm := commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.KeyMarker,
Bookmark: p.VersionIDMarker,
}
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
IsTruncated: isTruncated,
}
if res.IsTruncated {
res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
}
res.CommonPrefixes, objects = triageExtendedObjects(objects)
res.Version, res.DeleteMarker = triageVersions(objects)
return res, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
session, err := n.getListLatestVersionsSession(ctx, p)
if err != nil {
return nil, nil, err
}
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1)
objects = append(objects, session.Next...)
for obj := range objOutCh {
objects = append(objects, obj)
}
if err = <-errorCh; err != nil {
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sort.Slice(objects, func(i, j int) bool { return objects[i].NodeVersion.FilePath < objects[j].NodeVersion.FilePath })
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
n.putListLatestVersionsSession(ctx, p, session, objects)
objects = objects[:p.MaxKeys]
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
if p.MaxKeys == 0 {
return nil, false, nil
}
session, err := n.getListAllVersionsSession(ctx, p)
if err != nil {
return nil, false, err
}
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p, generator)
if err != nil {
return nil, false, err
}
allObjects := handleGeneratedVersions(objOutCh, p, session)
sort.SliceStable(allObjects, func(i, j int) bool { return allObjects[i].NodeVersion.FilePath < allObjects[j].NodeVersion.FilePath })
if err = <-errorCh; err != nil {
return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
}
var isTruncated bool
if len(allObjects) > p.MaxKeys {
isTruncated = true
n.putListAllVersionsSession(ctx, p, session, allObjects)
allObjects = allObjects[:p.MaxKeys]
}
return allObjects, isTruncated, nil
}
func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion {
var lastName string
var listRowStartIndex int
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
if eoi.DirName != "" {
name = eoi.DirName
}
if lastName != name {
formVersionsListRow(allObjects, listRowStartIndex, session)
listRowStartIndex = len(allObjects)
allObjects = append(allObjects, eoi)
} else if eoi.DirName == "" {
allObjects = append(allObjects, eoi)
}
lastName = name
}
formVersionsListRow(allObjects, listRowStartIndex, session)
return allObjects
}
func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) {
if len(objects) == 0 {
return
}
prevVersions := objects[rowStartIndex:]
sort.Slice(prevVersions, func(i, j int) bool {
return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first
})
prevVersions[0].IsLatest = len(session.Next) == 0 || session.Next[0].NodeVersion.FilePath != prevVersions[0].NodeVersion.FilePath
for _, version := range prevVersions[1:] {
version.IsLatest = false
}
}
func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}
func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p, false)
}
func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey)
if session == nil {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
if session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
return session, nil
}
func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
if err != nil {
return nil, err
}
return session, nil
}
func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
if len(allObjects) <= p.MaxKeys {
return
}
var cacheKey cache.ListSessionKey
switch p.ListType {
case ListObjectsV1Type:
cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].Name())
case ListObjectsV2Type:
cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].NodeVersion.OID.EncodeToString())
default:
// should never happen
panic("invalid list type")
}
session.Acquired.Store(false)
session.Next = []*data.ExtendedNodeVersion{allObjects[p.MaxKeys]}
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
if len(allObjects) <= p.MaxKeys {
return
}
session.Acquired.Store(false)
session.Next = make([]*data.ExtendedNodeVersion, len(allObjects)-p.MaxKeys+1)
session.Next[0] = allObjects[p.MaxKeys-1]
for i, node := range allObjects[p.MaxKeys:] {
session.Next[i+1] = node
}
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].NodeVersion.OID.EncodeToString())
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}
func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
if len(stream.Next) != 0 {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if len(stream.Next) == 0 {
limit++
}
go func() {
var generated int
var err error
LOOP:
for err == nil {
node, err := stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
nodeExt := &data.ExtendedNodeVersion{
NodeVersion: node,
IsLatest: true,
DirName: tryDirectoryName(node, p.Prefix, p.Delimiter),
}
if shouldSkip(nodeExt, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- nodeExt:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
delete(existed, continuationToken)
go func() {
var (
generated int
ind int
err error
lastName string
node *data.NodeVersion
nodeExt *data.ExtendedNodeVersion
)
LOOP:
for err == nil {
if ind < len(stream.Next) {
nodeExt = stream.Next[ind]
ind++
} else {
node, err = stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
nodeExt = &data.ExtendedNodeVersion{
NodeVersion: node,
DirName: tryDirectoryName(node, p.Prefix, p.Delimiter),
}
}
if shouldSkipVersions(nodeExt, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- nodeExt:
generated++
if generated > p.MaxKeys && nodeExt.NodeVersion.FilePath != lastName {
break LOOP
}
lastName = nodeExt.NodeVersion.FilePath
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ExtendedNodeVersion, size)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if node.DirName != "" || node.NodeVersion.IsFilledExtra() {
select {
case <-ctx.Done():
case objCh <- node:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.ExtendedNodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
realSize, err := GetObjectSize(oi)
if err != nil {
reqLog.Debug(logs.FailedToGetRealObjectSize, zap.Error(err))
realSize = oi.Size
}
node.NodeVersion.FillExtra(&oi.Owner, &oi.Created, realSize)
select {
case <-ctx.Done():
case objCh <- node:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
if node.NodeVersion.IsDeleteMarker {
return true
}
filePath := node.NodeVersion.FilePath
if node.DirName != "" {
filePath = node.DirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
filePath := node.NodeVersion.FilePath
if node.DirName != "" {
filePath = node.DirName
if _, ok := existed[filePath]; ok {
return true
}
}
if filePath < p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
return true
}
}
existed[filePath] = struct{}{}
return false
}
func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []string, objects []*data.ExtendedNodeVersion) {
for _, ov := range allObjects {
if ov.DirName != "" {
prefixes = append(prefixes, ov.DirName)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.NodeVersion.FilePath == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.NodeVersion.FilePath > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedNodeVersion{}, nil
}
func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedNodeVersion
var resDelMarkVersions []*data.ExtendedNodeVersion
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}

View file

@ -13,14 +13,11 @@ import (
"io"
"mime"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
@ -29,7 +26,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/minio/sio"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
@ -50,38 +46,6 @@ type (
bktInfo *data.BucketInfo
}
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
allObjectParams struct {
Bucket *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
ContinuationToken string
}
DeleteMarkerError struct {
ErrorCode apiErrors.ErrorCode
}
@ -332,13 +296,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
}
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
now := TimeNow(ctx)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: size,
Size: p.Size,
Created: &now,
Owner: &n.gateOwner,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
@ -411,7 +377,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, err
}
if node.IsDeleteMarker() {
if node.IsDeleteMarker {
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrNoSuchKey}
}
@ -468,7 +434,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return extObjInfo, nil
}
if foundVersion.IsDeleteMarker() {
if foundVersion.IsDeleteMarker {
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrMethodNotAllowed}
}
@ -532,61 +498,6 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].Name
}
result.Prefixes, result.Objects = triageObjects(objects)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
ContinuationToken: p.ContinuationToken,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.ID.EncodeToString()
}
result.Prefixes, result.Objects = triageObjects(objects)
return &result, nil
}
type logWrapper struct {
log *zap.Logger
}
@ -595,310 +506,11 @@ func (l *logWrapper) Printf(format string, args ...interface{}) {
l.log.Info(fmt.Sprintf(format, args...))
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix)
if err != nil {
return nil, nil, err
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
if len(nodeVersions) == 0 {
return nil, nil, nil
}
sort.Slice(nodeVersions, func(i, j int) bool {
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
})
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions))
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
for obj := range objOutCh {
objects = append(objects, obj)
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
objects = objects[:p.MaxKeys]
}
return
}
func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
go func() {
var generated int
LOOP:
for _, node := range nodeVersions {
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
}()
return nodeCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ObjectInfo)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
select {
case <-ctx.Done():
case objCh <- oi:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
var err error
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix)
if err != nil {
return nil, fmt.Errorf("get all versions from tree service: %w", err)
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
return nodeVersions, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix)
if err != nil {
return nil, err
}
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
for _, nodeVersion := range nodeVersions {
oi := &data.ObjectInfo{}
if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in FrostFS
oi.ID = nodeVersion.OID
oi.Name = nodeVersion.FilePath
oi.Owner = nodeVersion.DeleteMarker.Owner
oi.Created = nodeVersion.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil {
continue
}
}
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: nodeVersion,
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = []*data.ExtendedObjectInfo{eoi}
} else if !oi.IsDir {
objVersions = append(objVersions, eoi)
}
versions[oi.Name] = objVersions
}
return versions, nil
}
func IsSystemHeader(key string) bool {
_, ok := api.SystemMetadata[key]
return ok || strings.HasPrefix(key, api.FrostFSSystemMetadataPrefix)
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker() {
return true
}
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.ContinuationToken != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
for _, ov := range allObjects {
if ov.IsDir {
prefixes = append(prefixes, ov.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) {
for _, ov := range allObjects {
if ov.ObjectInfo.IsDir {
prefixes = append(prefixes, ov.ObjectInfo.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) {
if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil {
return oiDir
}
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
dirName := tryDirectoryName(node, prefix, delimiter)
if len(dirName) == 0 {
return nil
}
return &data.ObjectInfo{
ID: node.OID, // to use it as continuation token
CID: bktInfo.CID,
IsDir: true,
IsDeleteMarker: node.IsDeleteMarker(),
Bucket: bktInfo.Name,
Name: dirName,
}
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
if input == nil {
return nil

View file

@ -174,13 +174,13 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
}
}
if err == nil && version.IsDeleteMarker() && !objVersion.NoErrorOnDeleteMarker {
if err == nil && version.IsDeleteMarker && !objVersion.NoErrorOnDeleteMarker {
return nil, fmt.Errorf("%w: found version is delete marker", s3errors.GetAPIError(s3errors.ErrNoSuchKey))
} else if errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
}
if err == nil && version != nil && !version.IsDeleteMarker() {
if err == nil && version != nil && !version.IsDeleteMarker {
n.reqLogger(ctx).Debug(logs.GetTreeNode,
zap.Stringer("cid", objVersion.BktInfo.CID), zap.Stringer("oid", version.OID))
}

View file

@ -3,6 +3,7 @@ package layer
import (
"context"
"fmt"
"io"
"sort"
"strings"
@ -10,6 +11,21 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type VersionsByPrefixStreamMock struct {
result []*data.NodeVersion
offset int
}
func (s *VersionsByPrefixStreamMock) Next(context.Context) (*data.NodeVersion, error) {
if s.offset > len(s.result)-1 {
return nil, io.EOF
}
res := s.result[s.offset]
s.offset++
return res, nil
}
type TreeServiceMock struct {
settings map[string]*data.BucketSettings
versions map[string]map[string][]*data.NodeVersion
@ -171,7 +187,7 @@ func (t *TreeServiceMock) GetLatestVersion(_ context.Context, bktInfo *data.Buck
return nil, ErrNodeNotFound
}
func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
func (t *TreeServiceMock) InitVersionsByPrefixStream(_ context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {
return nil, ErrNodeNotFound
@ -184,6 +200,11 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *
continue
}
if !latestOnly {
result = append(result, versions...)
continue
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].ID < versions[j].ID
})
@ -193,7 +214,9 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *
}
}
return result, nil
return &VersionsByPrefixStreamMock{
result: result,
}, nil
}
func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {

View file

@ -54,8 +54,7 @@ type TreeService interface {
GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error)
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error

View file

@ -13,39 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type (
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.ObjectInfo
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedObjectInfo
DeleteMarker []*data.ExtendedObjectInfo
VersionIDMarker string
}
)
// PathSeparator is a path components separator string.
const PathSeparator = string(os.PathSeparator)
@ -83,7 +50,6 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
return &data.ObjectInfo{
ID: objID,
CID: bkt.CID,
IsDir: false,
Bucket: bkt.Name,
Name: filepathFromObject(meta),

View file

@ -1,51 +1,13 @@
package layer
import (
"encoding/hex"
"net/http"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/stretchr/testify/require"
)
var (
defaultTestCreated = time.Now()
defaultTestPayload = []byte("test object payload")
defaultTestPayloadLength = uint64(len(defaultTestPayload))
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)
func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *data.ObjectInfo {
var hashSum checksum.Checksum
info := &data.ObjectInfo{
ID: obj,
Name: name,
Bucket: bkt.Name,
CID: bkt.CID,
Size: defaultTestPayloadLength,
ContentType: defaultTestContentType,
Created: time.Unix(defaultTestCreated.Unix(), 0),
Owner: bkt.Owner,
Headers: make(map[string]string),
HashSum: hex.EncodeToString(hashSum.Value()),
}
if isDir {
info.IsDir = true
info.Size = 0
info.ContentType = ""
info.Headers = nil
}
return info
}
func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
return &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
@ -56,98 +18,84 @@ func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
}
func TestTryDirectory(t *testing.T) {
var uid user.ID
var id oid.ID
var containerID cid.ID
bkt := &data.BucketInfo{
Name: "test-container",
CID: containerID,
Owner: uid,
Created: time.Now(),
}
cases := []struct {
name string
prefix string
result *data.ObjectInfo
result string
node *data.NodeVersion
delimiter string
}{
{
name: "small.jpg",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg not matched prefix",
prefix: "big",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg delimiter",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "test/small.jpg",
result: nil,
result: "",
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "test/small.jpg with prefix and delimiter",
prefix: "test/",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a",
result: nil,
result: "",
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
result: "a/b/",
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
result: "a/b/",
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/b/c/s",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/big.jpg",
prefix: "a/b/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/c/", true),
result: "a/b/c/",
node: newTestNodeVersion(id, "a/b/c/big.jpg"),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
info := tryDirectory(bkt, tc.node, tc.prefix, tc.delimiter)
if tc.result != nil {
tc.result.Created = time.Time{}
tc.result.Owner = user.ID{}
}
require.Equal(t, tc.result, info)
dirName := tryDirectoryName(tc.node, tc.prefix, tc.delimiter)
require.Equal(t, tc.result, dirName)
})
}
}

View file

@ -1,109 +0,0 @@
package layer
import (
"context"
"sort"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
)
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
sortedVersions := versions[name]
sort.Slice(sortedVersions, func(i, j int) bool {
return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range sortedVersions {
version.IsLatest = i == 0
allObjects = append(allObjects, version)
}
}
if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
return nil, err
}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
}
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID()
allObjects = allObjects[:p.MaxKeys]
}
res.Version, res.DeleteMarker = triageVersions(allObjects)
return res, nil
}
func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.ObjectInfo.Name == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.ObjectInfo.Name > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedObjectInfo{}, nil
}
func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedObjectInfo
var resDelMarkVersions []*data.ExtendedObjectInfo
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker() {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}

View file

@ -72,7 +72,7 @@ func (tc *testContext) deleteObject(objectName, versionID string, settings *data
}
}
func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
func (tc *testContext) listObjectsV1() []*data.ExtendedNodeVersion {
res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{
ListObjectsParamsCommon: ListObjectsParamsCommon{
BktInfo: tc.bktInfo,
@ -83,7 +83,7 @@ func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
return res.Objects
}
func (tc *testContext) listObjectsV2() []*data.ObjectInfo {
func (tc *testContext) listObjectsV2() []*data.ExtendedNodeVersion {
res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{
ListObjectsParamsCommon: ListObjectsParamsCommon{
BktInfo: tc.bktInfo,
@ -168,10 +168,11 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
layerCfg := &Config{
Caches: config,
Cache: NewCache(config),
AnonKey: AnonymousKey{Key: key},
TreeService: NewTreeService(),
Features: &FeatureSettingsMock{},
GateOwner: owner,
}
return &testContext{
@ -288,9 +289,10 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
tc.getObject(tc.obj, "", true)
versions := tc.listVersions()
require.Len(t, versions.DeleteMarker, 1)
for _, ver := range versions.DeleteMarker {
if ver.IsLatest {
tc.deleteObject(tc.obj, ver.ObjectInfo.VersionID(), settings)
tc.deleteObject(tc.obj, ver.NodeVersion.OID.EncodeToString(), settings)
}
}
@ -322,112 +324,112 @@ func TestFilterVersionsByMarker(t *testing.T) {
for _, tc := range []struct {
name string
objects []*data.ExtendedObjectInfo
objects []*data.ExtendedNodeVersion
params *ListObjectVersionsParams
expected []*data.ExtendedObjectInfo
expected []*data.ExtendedNodeVersion
error bool
}{
{
name: "missed key marker",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "", VersionIDMarker: "dummy"},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
},
{
name: "last version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[1].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
expected: []*data.ExtendedNodeVersion{},
},
{
name: "same name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
},
{
name: "different name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
},
{
name: "not matched name alphabetically less",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
},
{
name: "not matched name alphabetically less with dummy version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not matched name alphabetically greater",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj2", VersionIDMarker: testOIDs[2].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
expected: []*data.ExtendedNodeVersion{},
},
{
name: "not found version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
},
} {

View file

@ -165,7 +165,7 @@ func (a *App) initLayer(ctx context.Context) {
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
layerCfg := &layer.Config{
Caches: getCacheOptions(a.cfg, a.log),
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
AnonKey: layer.AnonymousKey{
Key: randomKey,
},
@ -906,6 +906,9 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime)
cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size)
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)

View file

@ -98,6 +98,8 @@ const ( // Settings.
cfgObjectsCacheSize = "cache.objects.size"
cfgListObjectsCacheLifetime = "cache.list.lifetime"
cfgListObjectsCacheSize = "cache.list.size"
cfgSessionListCacheLifetime = "cache.list_session.lifetime"
cfgSessionListCacheSize = "cache.list_session.size"
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
cfgBucketsCacheSize = "cache.buckets.size"
cfgNamesCacheLifetime = "cache.names.lifetime"

View file

@ -82,6 +82,9 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000
# Cache which keeps listing session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000
@ -210,4 +213,3 @@ S3_GW_PROXY_CONTRACT=proxy.frostfs
# Namespaces configuration
S3_GW_NAMESPACES_CONFIG=namespaces.json

View file

@ -100,6 +100,10 @@ cache:
list:
lifetime: 1m
size: 100
# Cache which keeps listing sessions
list_session:
lifetime: 1m
size: 100
# Cache which contains mapping of nice name to object addresses
names:
lifetime: 1m

View file

@ -396,6 +396,9 @@ cache:
list:
lifetime: 1m
size: 100
list_session:
lifetime: 1m
size: 100
names:
lifetime: 1m
size: 1000
@ -420,6 +423,7 @@ cache:
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |

View file

@ -124,6 +124,65 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
return subtree, nil
}
type SubTreeStreamImpl struct {
r *treepool.SubTreeReader
buffer []*grpcService.GetSubTreeResponse_Body
eof bool
index int
ln int
}
const bufSize = 1000
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
if s.index != -1 {
node := s.buffer[s.index]
s.index++
if s.index >= s.ln {
s.index = -1
}
return GetSubTreeResponseBodyWrapper{response: node}, nil
}
if s.eof {
return nil, io.EOF
}
var err error
s.ln, err = s.r.Read(s.buffer)
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", handleError(err))
}
s.eof = true
}
if s.ln > 0 {
s.index = 0
}
return s.Next()
}
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) {
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
return &SubTreeStreamImpl{
r: subTreeReader,
buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize),
index: -1,
}, nil
}
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
CID: bktInfo.CID,

View file

@ -80,8 +80,6 @@ const (
CouldntDeletePart = "couldn't delete part" // Warn in ../../api/layer/multipart_upload.go
PartDetails = "part details" // Debug in ../../api/layer/multipart_upload.go
GetObject = "get object" // Debug in ../../api/layer/layer.go
ObjectAlreadyRemoved = "object already removed" // Debug in ../../api/layer/layer.go
ObjectNotFound = "object not found" // Debug in ../../api/layer/layer.go
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
PutObject = "put object" // Debug in ../../api/layer/object.go
@ -95,6 +93,7 @@ const (
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
@ -137,4 +136,7 @@ const (
ControlAPIGetPolicy = "get policy request"
ControlAPIListPolicies = "list policies request"
PolicyValidationFailed = "policy validation failed"
ParseTreeNode = "parse tree node"
FailedToGetRealObjectSize = "failed to get real object size"
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
)

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
@ -16,6 +17,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type (
@ -29,12 +31,17 @@ type (
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
treeNode struct {
ID uint64
ParentID uint64
@ -199,29 +206,26 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
IsDeleteMarker: isDeleteMarker,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if isDeleteMarker {
var created time.Time
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
created = time.UnixMilli(utcMilli)
created := time.UnixMilli(utcMilli)
version.Created = &created
}
}
var owner user.ID
if ownerStr, ok := treeNode.Get(ownerKV); ok {
_ = owner.DecodeString(ownerStr)
var owner user.ID
if err := owner.DecodeString(ownerStr); err == nil {
version.Owner = &owner
}
}
version.DeleteMarker = &data.DeleteMarkerInfo{
Created: created,
Owner: owner,
}
}
return version
}
@ -635,8 +639,217 @@ func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, true)
type DummySubTreeStream struct {
data NodeResponse
read bool
}
func (s *DummySubTreeStream) Next() (NodeResponse, error) {
if s.read {
return nil, io.EOF
}
s.read = true
return s.data, nil
}
type VersionsByPrefixStreamImpl struct {
ctx context.Context
rootID uint64
intermediateRootID uint64
service ServiceClient
bktInfo *data.BucketInfo
mainStream SubTreeStream
innerStream SubTreeStream
headPrefix string
tailPrefix string
namesMap map[uint64]string
ended bool
latestOnly bool
currentLatest *data.NodeVersion
log *zap.Logger
}
func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
if s.ended {
return nil, io.EOF
}
for {
if s.innerStream == nil {
node, err := s.getNodeFromMainStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.ended = true
if s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("get node from main stream: %w", err)
}
if err = s.initInnerStream(node); err != nil {
return nil, fmt.Errorf("init inner stream: %w", err)
}
}
nodeVersion, err := s.getNodeVersionFromInnerStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.innerStream = nil
maps.Clear(s.namesMap)
if s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID {
return s.currentLatest, nil
}
continue
}
return nil, fmt.Errorf("inner stream: %w", err)
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
for {
node, err := s.mainStream.Next()
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, io.EOF
}
return nil, fmt.Errorf("main stream next: %w", err)
}
if node.GetNodeID() != s.rootID && strings.HasPrefix(getFilename(node), s.tailPrefix) {
return node, nil
}
}
}
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
if node.GetParentID() == s.rootID {
s.intermediateRootID = node.GetNodeID()
}
if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
}
return nil
}
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
for {
node, err := s.innerStream.Next()
if err != nil {
return nil, fmt.Errorf("inner stream: %w", err)
}
nodeVersion, skip, err := s.parseNodeResponse(node)
if err != nil {
return nil, err
}
if skip {
continue
}
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
continue
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
continue
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
trNode, fileName, err := parseTreeNode(node)
if err != nil {
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
return nil, true, nil
}
var parentPrefix string
if s.headPrefix != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
}
var filepath string
if trNode.ID != s.intermediateRootID {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
s.namesMap[trNode.ID] = filepath
}
if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
return nil, true, nil
}
return newNodeVersionFromTreeNode(filepath, trNode), false, nil
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
return &VersionsByPrefixStreamImpl{ended: true}, nil
}
return nil, err
}
return &VersionsByPrefixStreamImpl{
ctx: ctx,
namesMap: map[uint64]string{},
rootID: rootID,
service: c.service,
bktInfo: bktInfo,
mainStream: mainStream,
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
latestOnly: latestOnly,
log: c.log,
}, nil
}
func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", 0, io.EOF
}
return nil, "", 0, err
}
subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", 0, io.EOF
}
return nil, "", 0, err
}
return subTree, tailPrefix, rootID, nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (uint64, string, error) {
@ -757,65 +970,6 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey
}
func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, nodeID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var emptyOID oid.ID
var filepath string
namesMap := make(map[uint64]string, len(subTree))
versions := make(map[string][]*data.NodeVersion, len(subTree))
for i, node := range subTree {
treeNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
namesMap[treeNode.ID] = filepath
}
if treeNode.ObjID.Equals(emptyOID) { // The node can be intermediate but we still want to update namesMap
continue
}
key := formLatestNodeKey(node.GetParentID(), fileName)
versionNodes, ok := versions[key]
if !ok {
versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(filepath, treeNode)}
} else if !latestOnly {
versionNodes = append(versionNodes, newNodeVersionFromTreeNode(filepath, treeNode))
} else if versionNodes[0].Timestamp <= treeNode.TimeStamp {
versionNodes[0] = newNodeVersionFromTreeNode(filepath, treeNode)
}
versions[key] = versionNodes
}
result := make([]*data.NodeVersion, 0, len(versions)) // consider use len(subTree)
for _, version := range versions {
if latestOnly && version[0].IsDeleteMarker() {
continue
}
result = append(result, version...)
}
return result, nil
}
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
parentPath, ok := namesMap[node.GetParentID()]
if !ok {
@ -846,28 +1000,6 @@ func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, false)
}
func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) {
prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly)
if err != nil {
return nil, err
}
var result []*data.NodeVersion
for _, node := range prefixNodes {
versions, err := c.getSubTreeVersions(ctx, bktInfo, node.GetNodeID(), headPrefix, latestOnly)
if err != nil {
return nil, err
}
result = append(result, versions...)
}
return result, nil
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
@ -1156,6 +1288,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
}
if version.Size > 0 {
@ -1168,10 +1302,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker() {
if version.IsDeleteMarker {
meta[isDeleteMarkerKV] = "true"
meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString()
meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10)
}
if version.IsCombined {

View file

@ -3,6 +3,7 @@ package tree
import (
"context"
"fmt"
"io"
"sort"
"time"
@ -222,14 +223,58 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket
return nil, ErrNodeNotFound
}
sortNode(tr.treeData)
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
}
type SubTreeStreamMemoryImpl struct {
res []NodeResponse
offset int
err error
}
func (s *SubTreeStreamMemoryImpl) Next() (NodeResponse, error) {
if s.err != nil {
return nil, s.err
}
if s.offset > len(s.res)-1 {
return nil, io.EOF
}
s.offset++
return s.res[s.offset-1], nil
}
func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return &SubTreeStreamMemoryImpl{err: ErrNodeNotFound}, nil
}
tr, ok := cnr.trees[treeID]
if !ok {
return nil, ErrNodeNotFound
}
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
}
sortNode(tr.treeData)
return &SubTreeStreamMemoryImpl{
res: node.listNodes(nil, depth-1),
offset: 0,
}, nil
}
func newContainerInfo(bktInfo *data.BucketInfo, treeID string) containerInfo {
return containerInfo{
bkt: bktInfo,
@ -257,7 +302,11 @@ func newMemoryTree() memoryTree {
}
}
func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
func (c *ServiceClientMemory) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
return c.AddNodeBase(ctx, bktInfo, treeID, parent, meta, true)
}
func (c *ServiceClientMemory) AddNodeBase(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string, needSort bool) (uint64, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
cnr = newContainerInfo(bktInfo, treeID)
@ -289,6 +338,9 @@ func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInf
}
parentNode.children = append(parentNode.children, tn)
if needSort {
sortNodes(parentNode.children)
}
cnr.trees[treeID] = tr
return newID, nil
@ -361,6 +413,24 @@ func (c *ServiceClientMemory) MoveNode(_ context.Context, bktInfo *data.BucketIn
return nil
}
func sortNode(node *treeNodeMemory) {
if node == nil {
return
}
sortNodes(node.children)
for _, child := range node.children {
sortNode(child)
}
}
func sortNodes(list []*treeNodeMemory) {
sort.Slice(list, func(i, j int) bool {
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
})
}
func (c *ServiceClientMemory) RemoveNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {

View file

@ -3,10 +3,12 @@ package tree
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -141,12 +143,15 @@ func TestTreeServiceAddVersion(t *testing.T) {
CID: cidtest.ID(),
}
now := time.Now()
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: oidtest.ID(),
Size: 10,
ETag: "etag",
FilePath: "path/to/version",
Owner: usertest.ID(),
Created: &now,
},
IsUnversioned: true,
}