feature/165-speed_up_listing #294

Merged
alexvanin merged 22 commits from dkirillov/frostfs-s3-gw:feature/165-speed_up_listing into master 2024-09-04 19:51:13 +00:00
32 changed files with 1977 additions and 892 deletions

107
api/cache/listsession.go vendored Normal file
View file

@ -0,0 +1,107 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// ListSessionCache contains cache for list session (during pagination).
ListSessionCache struct {
cache gcache.Cache
logger *zap.Logger
}
// ListSessionKey is a key to find a ListSessionCache's entry.
ListSessionKey struct {
cid cid.ID
prefix string
token string
}
)
const (
// DefaultListSessionCacheLifetime is a default lifetime of entries in cache of ListObjects.
DefaultListSessionCacheLifetime = time.Second * 60
// DefaultListSessionCacheSize is a default size of cache of ListObjects.
DefaultListSessionCacheSize = 100
)
// DefaultListSessionConfig returns new default cache expiration values.
func DefaultListSessionConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultListSessionCacheSize,
Lifetime: DefaultListSessionCacheLifetime,
Logger: logger,
}
}
func (k *ListSessionKey) String() string {
return k.cid.EncodeToString() + k.prefix + k.token
}
// NewListSessionCache is a constructor which creates an object of ListObjectsCache with the given lifetime of entries.
func NewListSessionCache(config *Config) *ListSessionCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).EvictedFunc(func(key interface{}, val interface{}) {
session, ok := val.(*data.ListSession)
if !ok {
config.Logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", val)),
zap.String("expected", fmt.Sprintf("%T", session)))
}
if !session.Acquired.Load() {
session.Cancel()
}
}).Build()
return &ListSessionCache{cache: gc, logger: config.Logger}
}
// GetListSession returns a list of ObjectInfo.
func (l *ListSessionCache) GetListSession(key ListSessionKey) *data.ListSession {
entry, err := l.cache.Get(key)
if err != nil {
return nil
}
result, ok := entry.(*data.ListSession)
if !ok {
l.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// PutListSession puts a list of object versions to cache.
func (l *ListSessionCache) PutListSession(key ListSessionKey, session *data.ListSession) error {
s := l.GetListSession(key)
if s != nil && s != session {
if !s.Acquired.Load() {
s.Cancel()
}
}
return l.cache.Set(key, session)
}
// DeleteListSession removes key from cache.
func (l *ListSessionCache) DeleteListSession(key ListSessionKey) {
l.cache.Remove(key)
}
// CreateListSessionCacheKey returns ListSessionKey with the given CID, prefix and token.
func CreateListSessionCacheKey(cnr cid.ID, prefix, token string) ListSessionKey {
p := ListSessionKey{
cid: cnr,
prefix: prefix,
token: token,
}
return p
}

View file

@ -35,10 +35,8 @@ type (
// ObjectInfo holds S3 object data.
ObjectInfo struct {
ID oid.ID
CID cid.ID
IsDir bool
IsDeleteMarker bool
ID oid.ID
CID cid.ID
Bucket string
Name string

19
api/data/listsession.go Normal file
View file

@ -0,0 +1,19 @@
package data
import (
"context"
"sync/atomic"
)
type VersionsStream interface {
Next(ctx context.Context) (*NodeVersion, error)
}
type ListSession struct {
Next []*ExtendedNodeVersion
Stream VersionsStream
NamesMap map[string]struct{}
Context context.Context
Cancel context.CancelFunc
Acquired atomic.Bool
}

View file

@ -16,20 +16,31 @@ const (
// NodeVersion represent node from tree service.
type NodeVersion struct {
BaseNodeVersion
DeleteMarker *DeleteMarkerInfo
IsUnversioned bool
IsCombined bool
}
func (v NodeVersion) IsDeleteMarker() bool {
return v.DeleteMarker != nil
// ExtendedNodeVersion contains additional node info to be able to sort versions by timestamp.
type ExtendedNodeVersion struct {
NodeVersion *NodeVersion
IsLatest bool
DirName string
}
// DeleteMarkerInfo is used to save object info if node in the tree service is delete marker.
// We need this information because the "delete marker" object is no longer stored in FrostFS.
type DeleteMarkerInfo struct {
Created time.Time
Owner user.ID
func (e ExtendedNodeVersion) Version() string {
if e.NodeVersion.IsUnversioned {
return UnversionedObjectVersionID
}
return e.NodeVersion.OID.EncodeToString()
}
func (e ExtendedNodeVersion) Name() string {
if e.DirName != "" {
return e.DirName
}
return e.NodeVersion.FilePath
}
// ExtendedObjectInfo contains additional node info to be able to sort versions by timestamp.
@ -50,14 +61,35 @@ func (e ExtendedObjectInfo) Version() string {
// BaseNodeVersion is minimal node info from tree service.
// Basically used for "system" object.
type BaseNodeVersion struct {
ID uint64
ParenID uint64
OID oid.ID
Timestamp uint64
Size uint64
ETag string
MD5 string
FilePath string
ID uint64
ParenID uint64
OID oid.ID
Timestamp uint64
Size uint64
ETag string
MD5 string
FilePath string
Created *time.Time
Owner *user.ID
IsDeleteMarker bool
}
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
if md5Enabled && len(v.MD5) > 0 {
return v.MD5
}
return v.ETag
}
// IsFilledExtra returns true is node was created by version of gate v0.29.x and later.
func (v BaseNodeVersion) IsFilledExtra() bool {
return v.Created != nil && v.Owner != nil
}
func (v *BaseNodeVersion) FillExtra(owner *user.ID, created *time.Time, realSize uint64) {
v.Owner = owner
v.Created = created
v.Size = realSize
}
type ObjectTaggingInfo struct {

View file

@ -137,7 +137,7 @@ func writeAttributesHeaders(h http.Header, info *data.ExtendedObjectInfo, isBuck
h.Set(api.AmzVersionID, info.Version())
}
if info.NodeVersion.IsDeleteMarker() {
if info.NodeVersion.IsDeleteMarker {
h.Set(api.AmzDeleteMarker, strconv.FormatBool(true))
}

View file

@ -243,6 +243,7 @@ func TestDeleteMarkerVersioned(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
@ -433,17 +434,20 @@ func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.B
return bktInfo, objInfo
}
func createVersionedBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
createTestBucket(tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
require.NoError(t, err)
putBucketVersioning(t, tc, bktName, true)
func createVersionedBucketAndObject(_ *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
bktInfo := createVersionedBucket(tc, bktName)
objInfo := createTestObject(tc, bktInfo, objName, encryption.Params{})
return bktInfo, objInfo
}
func createVersionedBucket(hc *handlerContext, bktName string) *data.BucketInfo {
bktInfo := createTestBucket(hc, bktName)
putBucketVersioning(hc.t, hc, bktName, true)
return bktInfo
}
func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabled bool) {
cfg := &VersioningConfiguration{Status: "Suspended"}
if enabled {

View file

@ -210,11 +210,12 @@ func TestGetObjectEnabledMD5(t *testing.T) {
require.Equal(t, data.Quote(objInfo.MD5Sum), headers.Get(api.ETag))
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
func putObjectContent(hc *handlerContext, bktName, objName, content string) http.Header {
body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
hc.Handler().PutObjectHandler(w, r)
assertStatus(hc.t, w, http.StatusOK)
return w.Result().Header
}
func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, start, end int) []byte {

View file

@ -45,6 +45,8 @@ type handlerContext struct {
config *configMock
layerFeatures *layer.FeatureSettingsMock
treeMock *tree.ServiceClientMemory
cache *layer.Cache
}
func (hc *handlerContext) Handler() *handler {
@ -125,14 +127,14 @@ func (c *configMock) ResolveNamespaceAlias(ns string) string {
}
func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false)
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
}
func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, true)
return prepareHandlerContextBase(t, getMinCacheConfig(zap.NewExample()))
}
func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
func prepareHandlerContextBase(t *testing.T, cacheCfg *layer.CachesConfig) *handlerContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
@ -147,21 +149,20 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
cacheCfg := layer.DefaultCachesConfigs(l)
if minCache {
cacheCfg = getMinCacheConfig(l)
}
treeMock := tree.NewTree(memCli, zap.NewExample())
features := &layer.FeatureSettingsMock{}
layerCfg := &layer.Config{
Caches: cacheCfg,
Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: treeMock,
Features: features,
GateOwner: owner,
}
var pp netmap.PlacementPolicy
@ -188,6 +189,8 @@ func prepareHandlerContextBase(t *testing.T, minCache bool) *handlerContext {
config: cfg,
layerFeatures: features,
treeMock: memCli,
cache: layerCfg.Cache,
}
}
@ -201,6 +204,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Logger: logger,
Objects: minCacheCfg,
ObjectsList: minCacheCfg,
SessionList: minCacheCfg,
Names: minCacheCfg,
Buckets: minCacheCfg,
System: minCacheCfg,
@ -262,12 +266,6 @@ func (a *apeMock) DeletePolicy(namespace string, cnrID cid.ID) error {
return nil
}
func NewTreeServiceMock(t *testing.T) *tree.Tree {
memCli, err := tree.NewTreeServiceClientMemory()
require.NoError(t, err)
return tree.NewTree(memCli, zap.NewExample())
}
func createTestBucket(hc *handlerContext, bktName string) *data.BucketInfo {
_, err := hc.MockedPool().CreateContainer(hc.Context(), layer.PrmContainerCreate{
Creator: hc.owner,

View file

@ -185,29 +185,26 @@ func fillPrefixes(src []string, encode string) []CommonPrefix {
return dst
}
func fillContentsWithOwner(src []*data.ObjectInfo, encode string, md5Enabled bool) []Object {
func fillContentsWithOwner(src []*data.ExtendedNodeVersion, encode string, md5Enabled bool) []Object {
return fillContents(src, encode, true, md5Enabled)
}
func fillContents(src []*data.ObjectInfo, encode string, fetchOwner, md5Enabled bool) []Object {
func fillContents(src []*data.ExtendedNodeVersion, encode string, fetchOwner, md5Enabled bool) []Object {
var dst []Object
for _, obj := range src {
res := Object{
Key: s3PathEncode(obj.Name, encode),
Size: obj.Size,
LastModified: obj.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(obj.ETag(md5Enabled)),
Key: s3PathEncode(obj.NodeVersion.FilePath, encode),
Size: obj.NodeVersion.Size,
LastModified: obj.NodeVersion.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(obj.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
}
if size, err := layer.GetObjectSize(obj); err == nil {
res.Size = size
}
if fetchOwner {
owner := obj.NodeVersion.Owner.String()
res.Owner = &Owner{
ID: obj.Owner.String(),
DisplayName: obj.Owner.String(),
ID: owner,
DisplayName: owner,
}
}
@ -284,15 +281,15 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
for _, ver := range info.Version {
res.Version = append(res.Version, ObjectVersionResponse{
IsLatest: ver.IsLatest,
Key: ver.ObjectInfo.Name,
LastModified: ver.ObjectInfo.Created.UTC().Format(time.RFC3339),
Key: ver.NodeVersion.FilePath,
LastModified: ver.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: ver.ObjectInfo.Owner.String(),
DisplayName: ver.ObjectInfo.Owner.String(),
ID: ver.NodeVersion.Owner.String(),
DisplayName: ver.NodeVersion.Owner.String(),
},
Size: ver.ObjectInfo.Size,
Size: ver.NodeVersion.Size,
VersionID: ver.Version(),
ETag: data.Quote(ver.ObjectInfo.ETag(md5Enabled)),
ETag: data.Quote(ver.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
})
}
@ -300,11 +297,11 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
for _, del := range info.DeleteMarker {
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
IsLatest: del.IsLatest,
Key: del.ObjectInfo.Name,
LastModified: del.ObjectInfo.Created.UTC().Format(time.RFC3339),
Key: del.NodeVersion.FilePath,
LastModified: del.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: del.ObjectInfo.Owner.String(),
DisplayName: del.ObjectInfo.Owner.String(),
ID: del.NodeVersion.Owner.String(),
DisplayName: del.NodeVersion.Owner.String(),
},
VersionID: del.Version(),
})

View file

@ -1,15 +1,22 @@
package handler
import (
"context"
"fmt"
"net/http"
"net/url"
"sort"
"strconv"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
func TestParseContinuationToken(t *testing.T) {
@ -58,13 +65,164 @@ func TestListObjectNullVersions(t *testing.T) {
require.Equal(t, data.UnversionedObjectVersionID, result.Version[1].VersionID)
}
func TestListObjectsPaging(t *testing.T) {
func TestListObjectsWithOldTreeNodes(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket-versioning-enabled", "object"
bktInfo := createTestBucket(hc, bktName)
srcEnc, err := encryption.NewParams([]byte("1234567890qwertyuiopasdfghjklzxc"))
require.NoError(t, err)
n := 10
objInfos := make([]*data.ObjectInfo, n)
for i := 0; i < n; i++ {
objInfos[i] = createTestObject(hc, bktInfo, objName+strconv.Itoa(i), *srcEnc)
}
sort.Slice(objInfos, func(i, j int) bool { return objInfos[i].Name < objInfos[j].Name })
makeAllTreeObjectsOld(hc, bktInfo)
listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
checkListOldNodes(hc, listV1.Contents, objInfos)
listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
checkListOldNodes(hc, listV2.Contents, objInfos)
listVers := listObjectsVersions(hc, bktName, "", "", "", "", -1)
checkListVersionsOldNodes(hc, listVers.Version, objInfos)
}
func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", 0, 0)
require.NoError(hc.t, err)
for _, node := range nodes {
if node.GetNodeID() == 0 {
continue
}
meta := make(map[string]string, len(node.GetMeta()))
for _, m := range node.GetMeta() {
if m.GetKey() != "Created" && m.GetKey() != "Owner" {
meta[m.GetKey()] = string(m.GetValue())
}
}
err = hc.treeMock.MoveNode(hc.Context(), bktInfo, "version", node.GetNodeID(), node.GetParentID(), meta)
require.NoError(hc.t, err)
}
}
func checkListOldNodes(hc *handlerContext, list []Object, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
func checkListVersionsOldNodes(hc *handlerContext, list []ObjectVersionResponse, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
func TestListObjectsContextCanceled(t *testing.T) {
layerCfg := layer.DefaultCachesConfigs(zaptest.NewLogger(t))
layerCfg.SessionList.Lifetime = time.Hour
layerCfg.SessionList.Size = 1
hc := prepareHandlerContextBase(t, layerCfg)
bktName := "bucket-versioning-enabled"
bktInfo := createTestBucket(hc, bktName)
for i := 0; i < 4; i++ {
putObject(hc, bktName, "object"+strconv.Itoa(i))
}
result := listObjectsV1(hc, bktName, "", "", "", 2)
session := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result.NextMarker))
// invoke list again to trigger cache eviction
// (use empty prefix to check that context canceled on replace)
listObjectsV1(hc, bktName, "", "", "", 2)
checkContextCanceled(session.Context, t)
result2 := listObjectsV2(hc, bktName, "", "", "", "", 2)
session2 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result2.NextContinuationToken))
// invoke list again to trigger cache eviction
// (use non-empty prefix to check that context canceled on cache eviction)
listObjectsV2(hc, bktName, "o", "", "", "", 2)
checkContextCanceled(session2.Context, t)
result3 := listObjectsVersions(hc, bktName, "", "", "", "", 2)
session3 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result3.NextVersionIDMarker))
// invoke list again to trigger cache eviction
listObjectsVersions(hc, bktName, "o", "", "", "", 2)
checkContextCanceled(session3.Context, t)
}
func checkContextCanceled(ctx context.Context, t *testing.T) {
select {
case <-ctx.Done():
case <-time.After(10 * time.Second):
}
require.ErrorIs(t, ctx.Err(), context.Canceled)
}
func TestListObjectsLatestVersions(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createTestBucket(hc, bktName)
putBucketVersioning(t, hc, bktName, true)
objName1, objName2 := "object1", "object2"
objContent1, objContent2 := "content1", "content2"
putObjectContent(hc, bktName, objName1, objContent1)
hdr1 := putObjectContent(hc, bktName, objName1, objContent2)
putObjectContent(hc, bktName, objName2, objContent1)
hdr2 := putObjectContent(hc, bktName, objName2, objContent2)
t.Run("listv1", func(t *testing.T) {
result := listObjectsV1(hc, bktName, "", "", "", -1)
require.Len(t, result.Contents, 2)
require.Equal(t, objName1, result.Contents[0].Key)
require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
require.Equal(t, objName2, result.Contents[1].Key)
require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
})
t.Run("listv2", func(t *testing.T) {
result := listObjectsV2(hc, bktName, "", "", "", "", -1)
require.Len(t, result.Contents, 2)
require.Equal(t, objName1, result.Contents[0].Key)
require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
require.Equal(t, objName2, result.Contents[1].Key)
require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
})
}
func TestListObjectsVersionsPaging(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createTestBucket(hc, bktName)
n := 10
n := 12
var objects []string
for i := 0; i < n; i++ {
@ -88,6 +246,65 @@ func TestListObjectsPaging(t *testing.T) {
require.Empty(t, objects)
}
func TestListObjectsVersionsCorrectIsLatestFlag(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-versioning-enabled"
createVersionedBucket(hc, bktName)
objName1, objName2 := "obj1", "obj2"
n := 9
listSize := 3
headers := make([]http.Header, n)
// objects uploaded: ["obj1"-v1, "obj1"-v2, "obj1"-v3, "obj2"-v1, "obj2"-v2, "obj2"-v3, "obj2"-v4, "obj2"-v5, "obj2"-v6]
for i := 0; i < n; i++ {
objName := objName1
if i >= listSize {
objName = objName2
}
headers[i] = putObjectContent(hc, bktName, objName, fmt.Sprintf("content/%d", i))
}
versions := listObjectsVersions(hc, bktName, "", "", "", "", listSize)
// expected objects: ["obj1"-v3, "obj1"-v2, "obj1"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName1, headers[:listSize], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v6, "obj2"-v5, "obj2"-v4]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[2*listSize:], true))
versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
// expected objects: ["obj2"-v3, "obj2"-v2, "obj2"-v1]
checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[listSize:2*listSize], false))
}
func formReverseVersionResponse(objName string, headers []http.Header, isLatest bool) []ObjectVersionResponse {
res := make([]ObjectVersionResponse, len(headers))
for i, h := range headers {
ind := len(headers) - 1 - i
res[ind] = ObjectVersionResponse{
ETag: h.Get(api.ETag),
IsLatest: isLatest && ind == 0,
Key: objName,
VersionID: h.Get(api.AmzVersionID),
}
}
return res
}
func checkListVersionsParts(t *testing.T, versions *ListObjectsVersionsResponse, expected []ObjectVersionResponse) {
require.Len(t, versions.Version, len(expected))
for i, res := range versions.Version {
require.Equal(t, expected[i].Key, res.Key)
require.Equal(t, expected[i].ETag, res.ETag)
require.Equal(t, expected[i].VersionID, res.VersionID)
require.Equal(t, expected[i].IsLatest, res.IsLatest)
}
}
func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T) {
tc := prepareHandlerContext(t)
@ -162,6 +379,132 @@ func TestS3BucketListDelimiterBasic(t *testing.T) {
require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix)
}
func TestS3BucketListEmpty(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
versions := listObjectsVersions(hc, bktName, "", "", "", "", -1)
require.Empty(t, versions.Version)
require.Empty(t, versions.DeleteMarker)
require.Empty(t, versions.CommonPrefixes)
}
func TestS3BucketListV2PrefixAlt(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"bar", "baz", "foo"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "ba", "", "", "", -1)
require.Equal(t, "ba", response.Prefix)
require.Len(t, response.Contents, 2)
require.Equal(t, "bar", response.Contents[0].Key)
require.Equal(t, "baz", response.Contents[1].Key)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"foo/bar", "foo/baz", "quux"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "d", "", "", "", -1)
require.Equal(t, "d", response.Prefix)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixUnreadable(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"foo/bar", "foo/baz", "quux"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "\x0a", "", "", "", -1)
require.Equal(t, "\x0a", response.Prefix)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixDelimiterAlt(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"bar", "bazar", "cab", "foo"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "ba", "a", "", "", -1)
require.Equal(t, "ba", response.Prefix)
require.Equal(t, "a", response.Delimiter)
require.Len(t, response.Contents, 1)
require.Equal(t, "bar", response.Contents[0].Key)
require.Len(t, response.CommonPrefixes, 1)
require.Equal(t, "baza", response.CommonPrefixes[0].Prefix)
}
func TestS3BucketListV2PrefixDelimiterDelimiterNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "b", "z", "", "", -1)
require.Len(t, response.Contents, 3)
require.Equal(t, "b/a/c", response.Contents[0].Key)
require.Equal(t, "b/a/g", response.Contents[1].Key)
require.Equal(t, "b/a/r", response.Contents[2].Key)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2PrefixDelimiterPrefixDelimiterNotExist(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
createTestBucket(hc, bktName)
objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
for _, objName := range objects {
putObject(hc, bktName, objName)
}
response := listObjectsV2(hc, bktName, "y", "z", "", "", -1)
require.Empty(t, response.Contents)
require.Empty(t, response.CommonPrefixes)
}
func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
tc := prepareHandlerContext(t)
@ -182,6 +525,35 @@ func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
require.Equal(t, "c%", listV2Response.CommonPrefixes[1].Prefix)
}
func TestS3BucketListDelimiterPrefix(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"asdf", "boo/bar", "boo/baz/xyzzy", "cquux/thud", "cquux/bla"}
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
var empty []string
delim := "/"
prefix := ""
marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"asdf"}, empty, "asdf")
marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"boo/"}, "boo/")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"cquux/"}, "")
marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"asdf"}, []string{"boo/"}, "boo/")
validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"cquux/"}, "")
prefix = "boo/"
marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"boo/bar"}, empty, "boo/bar")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"boo/baz/"}, "")
validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"boo/bar"}, []string{"boo/baz/"}, "")
}
func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
tc := prepareHandlerContext(t)
@ -211,6 +583,65 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
}
func TestS3BucketListDelimiterPrefixUnderscore(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"_obj1_", "_under1/bar", "_under1/baz/xyzzy", "_under2/thud", "_under2/bla"}
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
var empty []string
delim := "/"
prefix := ""
marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_obj1_"}, empty, "_obj1_")
marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"_under1/"}, "_under1/")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under2/"}, "")
marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"_obj1_"}, []string{"_under1/"}, "_under1/")
validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"_under2/"}, "")
prefix = "_under1/"
marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_under1/bar"}, empty, "_under1/bar")
validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under1/baz/"}, "")
validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"_under1/bar"}, []string{"_under1/baz/"}, "")
}
func TestS3BucketListDelimiterNotSkipSpecial(t *testing.T) {
hc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
bktInfo := createTestBucket(hc, bktName)
objects := []string{"0/"}
for i := 1000; i < 1999; i++ {
objects = append(objects, fmt.Sprintf("0/%d", i))
}
objects2 := []string{"1999", "1999#", "1999+", "2000"}
objects = append(objects, objects2...)
for _, objName := range objects {
createTestObject(hc, bktInfo, objName, encryption.Params{})
}
delimiter := "/"
list := listObjectsV1(hc, bktName, "", delimiter, "", -1)
require.Equal(t, delimiter, list.Delimiter)
require.Equal(t, []CommonPrefix{{Prefix: "0/"}}, list.CommonPrefixes)
require.Len(t, list.Contents, len(objects2))
for i := 0; i < len(list.Contents); i++ {
require.Equal(t, objects2[i], list.Contents[i].Key)
}
}
func TestMintVersioningListObjectVersionsVersionIDContinuation(t *testing.T) {
hc := prepareHandlerContext(t)
@ -251,13 +682,21 @@ func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, nam
}
func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys)
}
func listObjectsV2Ext(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken, encodingType string, maxKeys int) *ListObjectsV2Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
query.Add("fetch-owner", "true")
if len(startAfter) != 0 {
query.Add("start-after", startAfter)
}
if len(continuationToken) != 0 {
query.Add("continuation-token", continuationToken)
}
if len(encodingType) != 0 {
query.Add("encoding-type", encodingType)
}
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
hc.Handler().ListObjectsV2Handler(w, r)
@ -267,6 +706,26 @@ func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, c
return res
}
func validateListV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int,
isTruncated bool, checkObjects, checkPrefixes []string, nextMarker string) string {
response := listObjectsV1(tc, bktName, prefix, delimiter, marker, maxKeys)
require.Equal(t, isTruncated, response.IsTruncated)
require.Equal(t, nextMarker, response.NextMarker)
require.Len(t, response.Contents, len(checkObjects))
for i := 0; i < len(checkObjects); i++ {
require.Equal(t, checkObjects[i], response.Contents[i].Key)
}
require.Len(t, response.CommonPrefixes, len(checkPrefixes))
for i := 0; i < len(checkPrefixes); i++ {
require.Equal(t, checkPrefixes[i], response.CommonPrefixes[i].Prefix)
}
return response.NextMarker
}
func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
isTruncated, last bool, checkObjects, checkPrefixes []string) string {
response := listObjectsV2(tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)

View file

@ -11,13 +11,14 @@ import (
)
type Cache struct {
logger *zap.Logger
listsCache *cache.ObjectsListCache
objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache
bucketCache *cache.BucketCache
systemCache *cache.SystemCache
accessCache *cache.AccessControlCache
logger *zap.Logger
listsCache *cache.ObjectsListCache
sessionListCache *cache.ListSessionCache
objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache
bucketCache *cache.BucketCache
systemCache *cache.SystemCache
accessCache *cache.AccessControlCache
}
// CachesConfig contains params for caches.
@ -25,6 +26,7 @@ type CachesConfig struct {
Logger *zap.Logger
Objects *cache.Config
ObjectsList *cache.Config
SessionList *cache.Config
Names *cache.Config
Buckets *cache.Config
System *cache.Config
@ -37,6 +39,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
Logger: logger,
Objects: cache.DefaultObjectsConfig(logger),
ObjectsList: cache.DefaultObjectsListConfig(logger),
SessionList: cache.DefaultListSessionConfig(logger),
Names: cache.DefaultObjectsNameConfig(logger),
Buckets: cache.DefaultBucketConfig(logger),
System: cache.DefaultSystemConfig(logger),
@ -46,13 +49,14 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
func NewCache(cfg *CachesConfig) *Cache {
return &Cache{
logger: cfg.Logger,
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
objCache: cache.New(cfg.Objects),
namesCache: cache.NewObjectsNameCache(cfg.Names),
bucketCache: cache.NewBucketCache(cfg.Buckets),
systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
logger: cfg.Logger,
listsCache: cache.NewObjectsListCache(cfg.ObjectsList),
sessionListCache: cache.NewListSessionCache(cfg.SessionList),
objCache: cache.New(cfg.Objects),
namesCache: cache.NewObjectsNameCache(cfg.Names),
bucketCache: cache.NewBucketCache(cfg.Buckets),
systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
}
}
@ -144,6 +148,29 @@ func (c *Cache) PutList(owner user.ID, key cache.ObjectsListKey, list []*data.No
}
}
func (c *Cache) GetListSession(owner user.ID, key cache.ListSessionKey) *data.ListSession {
if !c.accessCache.Get(owner, key.String()) {
return nil
}
return c.sessionListCache.GetListSession(key)
}
func (c *Cache) PutListSession(owner user.ID, key cache.ListSessionKey, session *data.ListSession) {
if err := c.sessionListCache.PutListSession(key, session); err != nil {
c.logger.Warn(logs.CouldntCacheListSession, zap.Error(err))
}
if err := c.accessCache.Put(owner, key.String()); err != nil {
c.logger.Warn(logs.CouldntCacheAccessControlOperation, zap.Error(err))
}
}
func (c *Cache) DeleteListSession(owner user.ID, key cache.ListSessionKey) {
c.sessionListCache.DeleteListSession(key)
c.accessCache.Delete(owner, key.String())
}
func (c *Cache) GetTagging(owner user.ID, key string) map[string]string {
if !c.accessCache.Get(owner, key) {
return nil

View file

@ -76,7 +76,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
if zone != info.Zone {
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched", zone, info.Zone)
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, idCnr)
}
n.cache.PutBucket(info)

View file

@ -69,7 +69,7 @@ type (
Config struct {
GateOwner user.ID
ChainAddress string
Caches *CachesConfig
Cache *Cache
AnonKey AnonymousKey
Resolver BucketResolver
TreeService TreeService
@ -323,7 +323,7 @@ func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
gateOwner: config.GateOwner,
anonKey: config.AnonKey,
resolver: config.Resolver,
cache: NewCache(config.Caches),
cache: config.Cache,
treeService: config.TreeService,
features: config.Features,
}
@ -651,7 +651,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
var nullVersionToDelete *data.NodeVersion
if lastVersion.IsUnversioned {
if !lastVersion.IsDeleteMarker() {
if !lastVersion.IsDeleteMarker {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
@ -667,7 +667,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
}
if lastVersion.IsDeleteMarker() {
if lastVersion.IsDeleteMarker {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
@ -679,15 +679,14 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
obj.DeleteMarkVersion = randOID.EncodeToString()
now := TimeNow(ctx)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: TimeNow(ctx),
Owner: n.gateOwner,
OID: randOID,
FilePath: obj.Name,
Created: &now,
Owner: &n.gateOwner,
IsDeleteMarker: true,
},
IsUnversioned: settings.VersioningSuspended(),
}
@ -712,24 +711,15 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
}
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
if client.IsErrObjectAlreadyRemoved(obj.Error) {
n.reqLogger(ctx).Debug(logs.ObjectAlreadyRemoved,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
if obj.Error != nil {
return obj
}
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj
}
if client.IsErrObjectNotFound(obj.Error) {
n.reqLogger(ctx).Debug(logs.ObjectNotFound,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
obj.Error = nil
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
if obj.Error == nil {
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
}
@ -764,7 +754,7 @@ func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() {
if nodeVersion.IsDeleteMarker {
return obj.VersionID, nil
}
@ -810,11 +800,15 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}
func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
nodeVersions, err := n.getAllObjectsVersions(ctx, p.BktInfo, "", "")
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
BktInfo: p.BktInfo,
MaxKeys: 1,
})
if err != nil {
return err
}
if len(nodeVersions) != 0 {
if len(res) != 0 {
return errors.GetAPIError(errors.ErrBucketNotEmpty)
}

725
api/layer/listing.go Normal file
View file

@ -0,0 +1,725 @@
package layer
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.ExtendedNodeVersion
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedNodeVersion
DeleteMarker []*data.ExtendedNodeVersion
VersionIDMarker string
}
commonVersionsListingParams struct {
BktInfo *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
Bookmark string
}
commonLatestVersionsListingParams struct {
commonVersionsListingParams
ListType ListType
}
)
type ListType int
const (
ListObjectsV1Type ListType = iota + 1
ListObjectsV2Type ListType = iota + 1
)
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := commonLatestVersionsListingParams{
commonVersionsListingParams: commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
Bookmark: p.Marker,
},
ListType: ListObjectsV1Type,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].Name()
}
result.Prefixes, result.Objects = triageExtendedObjects(objects)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := commonLatestVersionsListingParams{
commonVersionsListingParams: commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
Bookmark: p.ContinuationToken,
},
ListType: ListObjectsV2Type,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.NodeVersion.OID.EncodeToString()
}
result.Prefixes, result.Objects = triageExtendedObjects(objects)
return &result, nil
}
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
prm := commonVersionsListingParams{
BktInfo: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.KeyMarker,
Bookmark: p.VersionIDMarker,
}
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
IsTruncated: isTruncated,
}
if res.IsTruncated {
res.NextKeyMarker = objects[p.MaxKeys-1].NodeVersion.FilePath
res.NextVersionIDMarker = objects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
}
res.CommonPrefixes, objects = triageExtendedObjects(objects)
res.Version, res.DeleteMarker = triageVersions(objects)
return res, nil
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVersionsListingParams) (objects []*data.ExtendedNodeVersion, next *data.ExtendedNodeVersion, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
session, err := n.getListLatestVersionsSession(ctx, p)
if err != nil {
return nil, nil, err
}
generator, errorCh := nodesGeneratorStream(ctx, p.commonVersionsListingParams, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p.commonVersionsListingParams, generator)
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1)
objects = append(objects, session.Next...)
for obj := range objOutCh {
objects = append(objects, obj)
}
if err = <-errorCh; err != nil {
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sort.Slice(objects, func(i, j int) bool { return objects[i].NodeVersion.FilePath < objects[j].NodeVersion.FilePath })
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
n.putListLatestVersionsSession(ctx, p, session, objects)
objects = objects[:p.MaxKeys]
}
return
}
func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListingParams) ([]*data.ExtendedNodeVersion, bool, error) {
if p.MaxKeys == 0 {
return nil, false, nil
}
session, err := n.getListAllVersionsSession(ctx, p)
if err != nil {
return nil, false, err
}
generator, errorCh := nodesGeneratorVersions(ctx, p, session)
objOutCh, err := n.initWorkerPool(ctx, 2, p, generator)
if err != nil {
return nil, false, err
}
allObjects := handleGeneratedVersions(objOutCh, p, session)
sort.SliceStable(allObjects, func(i, j int) bool { return allObjects[i].NodeVersion.FilePath < allObjects[j].NodeVersion.FilePath })
if err = <-errorCh; err != nil {
return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
}
var isTruncated bool
if len(allObjects) > p.MaxKeys {
isTruncated = true
n.putListAllVersionsSession(ctx, p, session, allObjects)
allObjects = allObjects[:p.MaxKeys]
}
return allObjects, isTruncated, nil
}
func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p commonVersionsListingParams, session *data.ListSession) []*data.ExtendedNodeVersion {
var lastName string
var listRowStartIndex int
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
for eoi := range objOutCh {
name := eoi.NodeVersion.FilePath
if eoi.DirName != "" {
name = eoi.DirName
}
if lastName != name {
formVersionsListRow(allObjects, listRowStartIndex, session)
listRowStartIndex = len(allObjects)
allObjects = append(allObjects, eoi)
} else if eoi.DirName == "" {
allObjects = append(allObjects, eoi)
}
lastName = name
}
formVersionsListRow(allObjects, listRowStartIndex, session)
return allObjects
}
func formVersionsListRow(objects []*data.ExtendedNodeVersion, rowStartIndex int, session *data.ListSession) {
if len(objects) == 0 {
return
}
prevVersions := objects[rowStartIndex:]
sort.Slice(prevVersions, func(i, j int) bool {
return prevVersions[j].NodeVersion.Timestamp < prevVersions[i].NodeVersion.Timestamp // sort in reverse order to have last added first
})
prevVersions[0].IsLatest = len(session.Next) == 0 || session.Next[0].NodeVersion.FilePath != prevVersions[0].NodeVersion.FilePath
for _, version := range prevVersions[1:] {
version.IsLatest = false
}
}
func (n *layer) getListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p.commonVersionsListingParams, true)
}
func (n *layer) getListAllVersionsSession(ctx context.Context, p commonVersionsListingParams) (*data.ListSession, error) {
return n.getListVersionsSession(ctx, p, false)
}
func (n *layer) getListVersionsSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (*data.ListSession, error) {
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, p.Bookmark)
session := n.cache.GetListSession(owner, cacheKey)
if session == nil {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
if session.Acquired.Swap(true) {
return n.initNewVersionsByPrefixSession(ctx, p, latestOnly)
}
// after reading next object from stream in session
// the current cache value already doesn't match with next token in cache key
n.cache.DeleteListSession(owner, cacheKey)
return session, nil
}
func (n *layer) initNewVersionsByPrefixSession(ctx context.Context, p commonVersionsListingParams, latestOnly bool) (session *data.ListSession, err error) {
session = &data.ListSession{NamesMap: make(map[string]struct{})}
session.Context, session.Cancel = context.WithCancel(context.Background())
if bd, err := middleware.GetBoxData(ctx); err == nil {
session.Context = middleware.SetBoxData(session.Context, bd)
}
session.Stream, err = n.treeService.InitVersionsByPrefixStream(session.Context, p.BktInfo, p.Prefix, latestOnly)
if err != nil {
return nil, err
}
return session, nil
}
func (n *layer) putListLatestVersionsSession(ctx context.Context, p commonLatestVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
if len(allObjects) <= p.MaxKeys {
return
}
var cacheKey cache.ListSessionKey
switch p.ListType {
case ListObjectsV1Type:
cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys-1].Name())
case ListObjectsV2Type:
cacheKey = cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, allObjects[p.MaxKeys].NodeVersion.OID.EncodeToString())
default:
// should never happen
panic("invalid list type")
}
session.Acquired.Store(false)
session.Next = []*data.ExtendedNodeVersion{allObjects[p.MaxKeys]}
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}
func (n *layer) putListAllVersionsSession(ctx context.Context, p commonVersionsListingParams, session *data.ListSession, allObjects []*data.ExtendedNodeVersion) {
if len(allObjects) <= p.MaxKeys {
return
}
session.Acquired.Store(false)
session.Next = make([]*data.ExtendedNodeVersion, len(allObjects)-p.MaxKeys+1)
session.Next[0] = allObjects[p.MaxKeys-1]
for i, node := range allObjects[p.MaxKeys:] {
session.Next[i+1] = node
}
cacheKey := cache.CreateListSessionCacheKey(p.BktInfo.CID, p.Prefix, session.Next[0].NodeVersion.OID.EncodeToString())
n.cache.PutListSession(n.BearerOwner(ctx), cacheKey, session)
}
func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
if len(stream.Next) != 0 {
existed[continuationToken] = struct{}{}
}
limit := p.MaxKeys
if len(stream.Next) == 0 {
limit++
}
go func() {
var generated int
var err error
LOOP:
for err == nil {
node, err := stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
nodeExt := &data.ExtendedNodeVersion{
NodeVersion: node,
IsLatest: true,
DirName: tryDirectoryName(node, p.Prefix, p.Delimiter),
}
if shouldSkip(nodeExt, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- nodeExt:
generated++
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func nodesGeneratorVersions(ctx context.Context, p commonVersionsListingParams, stream *data.ListSession) (<-chan *data.ExtendedNodeVersion, <-chan error) {
nodeCh := make(chan *data.ExtendedNodeVersion, 1000)
errCh := make(chan error, 1)
existed := stream.NamesMap
delete(existed, continuationToken)
go func() {
var (
generated int
ind int
err error
lastName string
node *data.NodeVersion
nodeExt *data.ExtendedNodeVersion
)
LOOP:
for err == nil {
if ind < len(stream.Next) {
nodeExt = stream.Next[ind]
ind++
} else {
node, err = stream.Stream.Next(ctx)
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- fmt.Errorf("stream next: %w", err)
}
break LOOP
}
nodeExt = &data.ExtendedNodeVersion{
NodeVersion: node,
DirName: tryDirectoryName(node, p.Prefix, p.Delimiter),
}
}
if shouldSkipVersions(nodeExt, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- nodeExt:
generated++
if generated > p.MaxKeys && nodeExt.NodeVersion.FilePath != lastName {
break LOOP
}
lastName = nodeExt.NodeVersion.FilePath
}
}
close(nodeCh)
close(errCh)
}()
return nodeCh, errCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p commonVersionsListingParams, input <-chan *data.ExtendedNodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ExtendedNodeVersion, size)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
if node.DirName != "" || node.NodeVersion.IsFilledExtra() {
select {
case <-ctx.Done():
case objCh <- node:
}
} else {
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.ExtendedNodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.BktInfo, node.NodeVersion); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
realSize, err := GetObjectSize(oi)
if err != nil {
reqLog.Debug(logs.FailedToGetRealObjectSize, zap.Error(err))
realSize = oi.Size
}
node.NodeVersion.FillExtra(&oi.Owner, &oi.Created, realSize)
select {
case <-ctx.Done():
case objCh <- node:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func shouldSkip(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
if node.NodeVersion.IsDeleteMarker {
return true
}
filePath := node.NodeVersion.FilePath
if node.DirName != "" {
filePath = node.DirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func shouldSkipVersions(node *data.ExtendedNodeVersion, p commonVersionsListingParams, existed map[string]struct{}) bool {
filePath := node.NodeVersion.FilePath
if node.DirName != "" {
filePath = node.DirName
if _, ok := existed[filePath]; ok {
return true
}
}
if filePath < p.Marker {
return true
}
if p.Bookmark != "" {
if _, ok := existed[continuationToken]; !ok {
if p.Bookmark != node.NodeVersion.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
return true
}
}
existed[filePath] = struct{}{}
return false
}
func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion) (prefixes []string, objects []*data.ExtendedNodeVersion) {
for _, ov := range allObjects {
if ov.DirName != "" {
prefixes = append(prefixes, ov.DirName)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.NodeVersion.FilePath == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.NodeVersion.FilePath > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedNodeVersion{}, nil
}
func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedNodeVersion
var resDelMarkVersions []*data.ExtendedNodeVersion
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}

View file

@ -13,14 +13,11 @@ import (
"io"
"mime"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
@ -29,7 +26,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/minio/sio"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
@ -50,38 +46,6 @@ type (
bktInfo *data.BucketInfo
}
// ListObjectsParamsCommon contains common parameters for ListObjectsV1 and ListObjectsV2.
ListObjectsParamsCommon struct {
BktInfo *data.BucketInfo
Delimiter string
Encode string
MaxKeys int
Prefix string
}
// ListObjectsParamsV1 contains params for ListObjectsV1.
ListObjectsParamsV1 struct {
ListObjectsParamsCommon
Marker string
}
// ListObjectsParamsV2 contains params for ListObjectsV2.
ListObjectsParamsV2 struct {
ListObjectsParamsCommon
ContinuationToken string
StartAfter string
FetchOwner bool
}
allObjectParams struct {
Bucket *data.BucketInfo
Delimiter string
Prefix string
MaxKeys int
Marker string
ContinuationToken string
}
DeleteMarkerError struct {
ErrorCode apiErrors.ErrorCode
}
@ -332,13 +296,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
}
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
now := TimeNow(ctx)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: size,
Size: p.Size,
Created: &now,
Owner: &n.gateOwner,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
@ -411,7 +377,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, err
}
if node.IsDeleteMarker() {
if node.IsDeleteMarker {
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrNoSuchKey}
}
@ -468,7 +434,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return extObjInfo, nil
}
if foundVersion.IsDeleteMarker() {
if foundVersion.IsDeleteMarker {
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrMethodNotAllowed}
}
@ -532,61 +498,6 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var result ListObjectsInfoV1
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.Marker,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextMarker = objects[len(objects)-1].Name
}
result.Prefixes, result.Objects = triageObjects(objects)
return &result, nil
}
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
var result ListObjectsInfoV2
prm := allObjectParams{
Bucket: p.BktInfo,
Delimiter: p.Delimiter,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Marker: p.StartAfter,
ContinuationToken: p.ContinuationToken,
}
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
if err != nil {
return nil, err
}
if next != nil {
result.IsTruncated = true
result.NextContinuationToken = next.ID.EncodeToString()
}
result.Prefixes, result.Objects = triageObjects(objects)
return &result, nil
}
type logWrapper struct {
log *zap.Logger
}
@ -595,310 +506,11 @@ func (l *logWrapper) Printf(format string, args ...interface{}) {
l.log.Info(fmt.Sprintf(format, args...))
}
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
if p.MaxKeys == 0 {
return nil, nil, nil
}
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix)
if err != nil {
return nil, nil, err
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
if len(nodeVersions) == 0 {
return nil, nil, nil
}
sort.Slice(nodeVersions, func(i, j int) bool {
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
})
poolCtx, cancel := context.WithCancel(ctx)
defer cancel()
objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions))
if err != nil {
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
}
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
for obj := range objOutCh {
objects = append(objects, obj)
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
objects = objects[:p.MaxKeys]
}
return
}
func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
nodeCh := make(chan *data.NodeVersion)
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
go func() {
var generated int
LOOP:
for _, node := range nodeVersions {
if shouldSkip(node, p, existed) {
continue
}
select {
case <-ctx.Done():
break LOOP
case nodeCh <- node:
generated++
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
break LOOP
}
}
}
close(nodeCh)
}()
return nodeCh
}
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
reqLog := n.reqLogger(ctx)
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
if err != nil {
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
}
objCh := make(chan *data.ObjectInfo)
go func() {
var wg sync.WaitGroup
LOOP:
for node := range input {
select {
case <-ctx.Done():
break LOOP
default:
}
// We have to make a copy of pointer to data.NodeVersion
// to get correct value in submitted task function.
func(node *data.NodeVersion) {
wg.Add(1)
err = pool.Submit(func() {
defer wg.Done()
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
if oi == nil {
// try to get object again
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
// do not process object which are definitely missing in object service
return
}
}
select {
case <-ctx.Done():
case objCh <- oi:
}
})
if err != nil {
wg.Done()
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}(node)
}
wg.Wait()
close(objCh)
pool.Release()
}()
return objCh, nil
}
func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
var err error
owner := n.BearerOwner(ctx)
cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix, false)
nodeVersions := n.cache.GetList(owner, cacheKey)
if nodeVersions == nil {
nodeVersions, err = n.treeService.GetAllVersionsByPrefix(ctx, bkt, prefix)
if err != nil {
return nil, fmt.Errorf("get all versions from tree service: %w", err)
}
n.cache.PutList(owner, cacheKey, nodeVersions)
}
return nodeVersions, nil
}
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix)
if err != nil {
return nil, err
}
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
for _, nodeVersion := range nodeVersions {
oi := &data.ObjectInfo{}
if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in FrostFS
oi.ID = nodeVersion.OID
oi.Name = nodeVersion.FilePath
oi.Owner = nodeVersion.DeleteMarker.Owner
oi.Created = nodeVersion.DeleteMarker.Created
oi.IsDeleteMarker = true
} else {
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil {
continue
}
}
eoi := &data.ExtendedObjectInfo{
ObjectInfo: oi,
NodeVersion: nodeVersion,
}
objVersions, ok := versions[oi.Name]
if !ok {
objVersions = []*data.ExtendedObjectInfo{eoi}
} else if !oi.IsDir {
objVersions = append(objVersions, eoi)
}
versions[oi.Name] = objVersions
}
return versions, nil
}
func IsSystemHeader(key string) bool {
_, ok := api.SystemMetadata[key]
return ok || strings.HasPrefix(key, api.FrostFSSystemMetadataPrefix)
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
if node.IsDeleteMarker() {
return true
}
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
if p.ContinuationToken != "" {
if _, ok := existed[continuationToken]; !ok {
if p.ContinuationToken != node.OID.EncodeToString() {
return true
}
existed[continuationToken] = struct{}{}
}
}
existed[filePath] = struct{}{}
return false
}
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
for _, ov := range allObjects {
if ov.IsDir {
prefixes = append(prefixes, ov.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) {
for _, ov := range allObjects {
if ov.ObjectInfo.IsDir {
prefixes = append(prefixes, ov.ObjectInfo.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) {
if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil {
return oiDir
}
owner := n.BearerOwner(ctx)
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
return extInfo.ObjectInfo
}
meta, err := n.objectHead(ctx, bktInfo, node.OID)
if err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotFetchObjectMeta, zap.Error(err))
return nil
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi
}
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
dirName := tryDirectoryName(node, prefix, delimiter)
if len(dirName) == 0 {
return nil
}
return &data.ObjectInfo{
ID: node.OID, // to use it as continuation token
CID: bktInfo.CID,
IsDir: true,
IsDeleteMarker: node.IsDeleteMarker(),
Bucket: bktInfo.Name,
Name: dirName,
}
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
if input == nil {
return nil

View file

@ -174,13 +174,13 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
}
}
if err == nil && version.IsDeleteMarker() && !objVersion.NoErrorOnDeleteMarker {
if err == nil && version.IsDeleteMarker && !objVersion.NoErrorOnDeleteMarker {
return nil, fmt.Errorf("%w: found version is delete marker", s3errors.GetAPIError(s3errors.ErrNoSuchKey))
} else if errors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
}
if err == nil && version != nil && !version.IsDeleteMarker() {
if err == nil && version != nil && !version.IsDeleteMarker {
n.reqLogger(ctx).Debug(logs.GetTreeNode,
zap.Stringer("cid", objVersion.BktInfo.CID), zap.Stringer("oid", version.OID))
}

View file

@ -3,6 +3,7 @@ package layer
import (
"context"
"fmt"
"io"
"sort"
"strings"
@ -10,6 +11,21 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type VersionsByPrefixStreamMock struct {
result []*data.NodeVersion
offset int
}
func (s *VersionsByPrefixStreamMock) Next(context.Context) (*data.NodeVersion, error) {
if s.offset > len(s.result)-1 {
return nil, io.EOF
}
res := s.result[s.offset]
s.offset++
return res, nil
}
type TreeServiceMock struct {
settings map[string]*data.BucketSettings
versions map[string]map[string][]*data.NodeVersion
@ -171,7 +187,7 @@ func (t *TreeServiceMock) GetLatestVersion(_ context.Context, bktInfo *data.Buck
return nil, ErrNodeNotFound
}
func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
func (t *TreeServiceMock) InitVersionsByPrefixStream(_ context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {
return nil, ErrNodeNotFound
@ -184,6 +200,11 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *
continue
}
if !latestOnly {
result = append(result, versions...)
continue
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].ID < versions[j].ID
})
@ -193,7 +214,9 @@ func (t *TreeServiceMock) GetLatestVersionsByPrefix(_ context.Context, bktInfo *
}
}
return result, nil
return &VersionsByPrefixStreamMock{
result: result,
}, nil
}
func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {

View file

@ -54,8 +54,7 @@ type TreeService interface {
GetVersions(ctx context.Context, bktInfo *data.BucketInfo, objectName string) ([]*data.NodeVersion, error)
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error)
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error

View file

@ -13,39 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type (
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
ListObjectsInfo struct {
Prefixes []string
Objects []*data.ObjectInfo
IsTruncated bool
}
// ListObjectsInfoV1 holds data which ListObjectsV1 returns.
ListObjectsInfoV1 struct {
ListObjectsInfo
NextMarker string
}
// ListObjectsInfoV2 holds data which ListObjectsV2 returns.
ListObjectsInfoV2 struct {
ListObjectsInfo
NextContinuationToken string
}
// ListObjectVersionsInfo stores info and list of objects versions.
ListObjectVersionsInfo struct {
CommonPrefixes []string
IsTruncated bool
KeyMarker string
NextKeyMarker string
NextVersionIDMarker string
Version []*data.ExtendedObjectInfo
DeleteMarker []*data.ExtendedObjectInfo
VersionIDMarker string
}
)
// PathSeparator is a path components separator string.
const PathSeparator = string(os.PathSeparator)
@ -81,9 +48,8 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
objID, _ := meta.ID()
payloadChecksum, _ := meta.PayloadChecksum()
return &data.ObjectInfo{
ID: objID,
CID: bkt.CID,
IsDir: false,
ID: objID,
CID: bkt.CID,
Bucket: bkt.Name,
Name: filepathFromObject(meta),

View file

@ -1,51 +1,13 @@
package layer
import (
"encoding/hex"
"net/http"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/stretchr/testify/require"
)
var (
defaultTestCreated = time.Now()
defaultTestPayload = []byte("test object payload")
defaultTestPayloadLength = uint64(len(defaultTestPayload))
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)
func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *data.ObjectInfo {
var hashSum checksum.Checksum
info := &data.ObjectInfo{
ID: obj,
Name: name,
Bucket: bkt.Name,
CID: bkt.CID,
Size: defaultTestPayloadLength,
ContentType: defaultTestContentType,
Created: time.Unix(defaultTestCreated.Unix(), 0),
Owner: bkt.Owner,
Headers: make(map[string]string),
HashSum: hex.EncodeToString(hashSum.Value()),
}
if isDir {
info.IsDir = true
info.Size = 0
info.ContentType = ""
info.Headers = nil
}
return info
}
func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
return &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
@ -56,98 +18,84 @@ func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
}
func TestTryDirectory(t *testing.T) {
var uid user.ID
var id oid.ID
var containerID cid.ID
bkt := &data.BucketInfo{
Name: "test-container",
CID: containerID,
Owner: uid,
Created: time.Now(),
}
cases := []struct {
name string
prefix string
result *data.ObjectInfo
result string
node *data.NodeVersion
delimiter string
}{
{
name: "small.jpg",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg not matched prefix",
prefix: "big",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg delimiter",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "test/small.jpg",
result: nil,
result: "",
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "test/small.jpg with prefix and delimiter",
prefix: "test/",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a",
result: nil,
result: "",
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
result: "a/b/",
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
result: "a/b/",
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/b/c/s",
delimiter: "/",
result: nil,
result: "",
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/big.jpg",
prefix: "a/b/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/c/", true),
result: "a/b/c/",
node: newTestNodeVersion(id, "a/b/c/big.jpg"),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
info := tryDirectory(bkt, tc.node, tc.prefix, tc.delimiter)
if tc.result != nil {
tc.result.Created = time.Time{}
tc.result.Owner = user.ID{}
}
require.Equal(t, tc.result, info)
dirName := tryDirectoryName(tc.node, tc.prefix, tc.delimiter)
require.Equal(t, tc.result, dirName)
})
}
}

View file

@ -1,109 +0,0 @@
package layer
import (
"context"
"sort"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
)
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter)
if err != nil {
return nil, err
}
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
sortedVersions := versions[name]
sort.Slice(sortedVersions, func(i, j int) bool {
return sortedVersions[j].NodeVersion.Timestamp < sortedVersions[i].NodeVersion.Timestamp // sort in reverse order
})
for i, version := range sortedVersions {
version.IsLatest = i == 0
allObjects = append(allObjects, version)
}
}
if allObjects, err = filterVersionsByMarker(allObjects, p); err != nil {
return nil, err
}
res := &ListObjectVersionsInfo{
KeyMarker: p.KeyMarker,
VersionIDMarker: p.VersionIDMarker,
}
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects)
if len(allObjects) > p.MaxKeys {
res.IsTruncated = true
res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID()
allObjects = allObjects[:p.MaxKeys]
}
res.Version, res.DeleteMarker = triageVersions(allObjects)
return res, nil
}
func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) {
if p.KeyMarker == "" {
return objects, nil
}
for i, obj := range objects {
if obj.ObjectInfo.Name == p.KeyMarker {
for j := i; j < len(objects); j++ {
if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name {
if p.VersionIDMarker == "" {
return objects[j:], nil
}
break
}
if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker {
return objects[j+1:], nil
}
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
} else if obj.ObjectInfo.Name > p.KeyMarker {
if p.VersionIDMarker != "" {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
}
return objects[i:], nil
}
}
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
// that can be empty
return []*data.ExtendedObjectInfo{}, nil
}
func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) {
if len(objVersions) == 0 {
return nil, nil
}
var resVersion []*data.ExtendedObjectInfo
var resDelMarkVersions []*data.ExtendedObjectInfo
for _, version := range objVersions {
if version.NodeVersion.IsDeleteMarker() {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resVersion = append(resVersion, version)
}
}
return resVersion, resDelMarkVersions
}

View file

@ -72,7 +72,7 @@ func (tc *testContext) deleteObject(objectName, versionID string, settings *data
}
}
func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
func (tc *testContext) listObjectsV1() []*data.ExtendedNodeVersion {
res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{
ListObjectsParamsCommon: ListObjectsParamsCommon{
BktInfo: tc.bktInfo,
@ -83,7 +83,7 @@ func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
return res.Objects
}
func (tc *testContext) listObjectsV2() []*data.ObjectInfo {
func (tc *testContext) listObjectsV2() []*data.ExtendedNodeVersion {
res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{
ListObjectsParamsCommon: ListObjectsParamsCommon{
BktInfo: tc.bktInfo,
@ -168,10 +168,11 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
layerCfg := &Config{
Caches: config,
Cache: NewCache(config),
AnonKey: AnonymousKey{Key: key},
TreeService: NewTreeService(),
Features: &FeatureSettingsMock{},
GateOwner: owner,
}
return &testContext{
@ -288,9 +289,10 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
tc.getObject(tc.obj, "", true)
versions := tc.listVersions()
require.Len(t, versions.DeleteMarker, 1)
for _, ver := range versions.DeleteMarker {
if ver.IsLatest {
tc.deleteObject(tc.obj, ver.ObjectInfo.VersionID(), settings)
tc.deleteObject(tc.obj, ver.NodeVersion.OID.EncodeToString(), settings)
}
}
@ -322,112 +324,112 @@ func TestFilterVersionsByMarker(t *testing.T) {
for _, tc := range []struct {
name string
objects []*data.ExtendedObjectInfo
objects []*data.ExtendedNodeVersion
params *ListObjectVersionsParams
expected []*data.ExtendedObjectInfo
expected []*data.ExtendedNodeVersion
error bool
}{
{
name: "missed key marker",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "", VersionIDMarker: "dummy"},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
},
{
name: "last version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[1].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
expected: []*data.ExtendedNodeVersion{},
},
{
name: "same name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
},
{
name: "different name, different versions",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
},
{
name: "not matched name alphabetically less",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
},
{
name: "not matched name alphabetically less with dummy version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not matched name alphabetically greater",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj2", VersionIDMarker: testOIDs[2].EncodeToString()},
expected: []*data.ExtendedObjectInfo{},
expected: []*data.ExtendedNodeVersion{},
},
{
name: "not found version id",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
error: true,
},
{
name: "not found version id, obj last",
objects: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
objects: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: ""},
expected: []*data.ExtendedObjectInfo{
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
expected: []*data.ExtendedNodeVersion{
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
},
},
} {

View file

@ -165,7 +165,7 @@ func (a *App) initLayer(ctx context.Context) {
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
layerCfg := &layer.Config{
Caches: getCacheOptions(a.cfg, a.log),
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
AnonKey: layer.AnonymousKey{
Key: randomKey,
},
@ -906,6 +906,9 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
cacheCfg.SessionList.Lifetime = fetchCacheLifetime(v, l, cfgSessionListCacheLifetime, cacheCfg.SessionList.Lifetime)
cacheCfg.SessionList.Size = fetchCacheSize(v, l, cfgSessionListCacheSize, cacheCfg.SessionList.Size)
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)

View file

@ -98,6 +98,8 @@ const ( // Settings.
cfgObjectsCacheSize = "cache.objects.size"
cfgListObjectsCacheLifetime = "cache.list.lifetime"
cfgListObjectsCacheSize = "cache.list.size"
cfgSessionListCacheLifetime = "cache.list_session.lifetime"
cfgSessionListCacheSize = "cache.list_session.size"
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
cfgBucketsCacheSize = "cache.buckets.size"
cfgNamesCacheLifetime = "cache.names.lifetime"

View file

@ -82,6 +82,9 @@ S3_GW_CACHE_OBJECTS_SIZE=1000000
# Cache which keeps lists of objects in buckets
S3_GW_CACHE_LIST_LIFETIME=1m
S3_GW_CACHE_LIST_SIZE=100000
# Cache which keeps listing session
S3_GW_CACHE_LIST_SESSION_LIFETIME=1m
S3_GW_CACHE_LIST_SESSION_SIZE=100
# Cache which contains mapping of bucket name to bucket info
S3_GW_CACHE_BUCKETS_LIFETIME=1m
S3_GW_CACHE_BUCKETS_SIZE=1000
@ -210,4 +213,3 @@ S3_GW_PROXY_CONTRACT=proxy.frostfs
# Namespaces configuration
S3_GW_NAMESPACES_CONFIG=namespaces.json

View file

@ -100,6 +100,10 @@ cache:
list:
lifetime: 1m
size: 100
# Cache which keeps listing sessions
list_session:
lifetime: 1m
size: 100
# Cache which contains mapping of nice name to object addresses
names:
lifetime: 1m

View file

@ -365,7 +365,7 @@ control:
- 035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11
- 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
grpc:
endpoint: localhost:8083
endpoint: localhost:8083
```
| Parameter | Type | SIGHUP reload | Default value | Description |
@ -396,6 +396,9 @@ cache:
list:
lifetime: 1m
size: 100
list_session:
lifetime: 1m
size: 100
names:
lifetime: 1m
size: 1000
@ -420,6 +423,7 @@ cache:
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
| `objects` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 1000000` | Cache for objects (FrostFS headers). |
| `list` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100000` | Cache which keeps lists of objects in buckets. |
| `list_session` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 100` | Cache which keeps listing session. |
| `names` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 10000` | Cache which contains mapping of nice name to object addresses. |
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
| `system` | [Cache config](#cache-subsection) | `lifetime: 5m`<br>`size: 10000` | Cache for system objects in a bucket: bucket settings, notification configuration etc. |
@ -639,7 +643,7 @@ FrostfsID contract configuration. To enable this functionality the `rpc_endpoint
```yaml
frostfsid:
contract: frostfsid.frostfs
validation:
validation:
enabled: false
```
@ -689,10 +693,10 @@ namespaces:
|-----------|----------|---------------|---------------|-----------------------------------------------------|
| `config` | `string` | yes | | Path to json file with config value for namespaces. |
## `namespaces.config` subsection
## `namespaces.config` subsection
Example of `namespaces.json`.
Note that config values from `namespaces.json` can override config values for default namespaces
Note that config values from `namespaces.json` can override config values for default namespaces
(value for which are fetched from regular config value e.g. [placement-policy](#placement_policy-section)).
To override config values for default namespaces use namespace names that are provided in `kludge.default_namespaces`.
@ -701,7 +705,7 @@ To override config values for default namespaces use namespace names that are pr
"namespaces": {
"namespace1": {
"location_constraints": {
"default": "REP 3",
"default": "REP 3",
"test": "{\"replicas\":[{\"count\":1,\"selector\":\"\"}],\"containerBackupFactor\":0,\"selectors\":[],\"filters\":[],\"unique\":false}"
},
"copies_numbers": {

View file

@ -124,6 +124,65 @@ func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo,
return subtree, nil
}
type SubTreeStreamImpl struct {
r *treepool.SubTreeReader
buffer []*grpcService.GetSubTreeResponse_Body
eof bool
index int
ln int
}
const bufSize = 1000
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
if s.index != -1 {
node := s.buffer[s.index]
s.index++
if s.index >= s.ln {
s.index = -1
}
return GetSubTreeResponseBodyWrapper{response: node}, nil
}
if s.eof {
return nil, io.EOF
}
var err error
s.ln, err = s.r.Read(s.buffer)
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", handleError(err))
}
s.eof = true
}
if s.ln > 0 {
s.index = 0
}
return s.Next()
}
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) {
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
return &SubTreeStreamImpl{
r: subTreeReader,
buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize),
index: -1,
}, nil
}
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
CID: bktInfo.CID,

View file

@ -80,8 +80,6 @@ const (
CouldntDeletePart = "couldn't delete part" // Warn in ../../api/layer/multipart_upload.go
PartDetails = "part details" // Debug in ../../api/layer/multipart_upload.go
GetObject = "get object" // Debug in ../../api/layer/layer.go
ObjectAlreadyRemoved = "object already removed" // Debug in ../../api/layer/layer.go
ObjectNotFound = "object not found" // Debug in ../../api/layer/layer.go
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
PutObject = "put object" // Debug in ../../api/layer/object.go
@ -95,6 +93,7 @@ const (
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go
CouldntPutObjAddressToNameCache = "couldn't put obj address to name cache" // Warn in ../../api/layer/cache.go
CouldntCacheListOfObjects = "couldn't cache list of objects" // Warn in ../../api/layer/cache.go
CouldntCacheListSession = "couldn't cache list session" // Warn in ../../api/layer/cache.go
CouldntCacheTags = "couldn't cache tags" // Error in ../../api/layer/cache.go
CouldntCacheLockInfo = "couldn't cache lock info" // Error in ../../api/layer/cache.go
CouldntCacheBucketSettings = "couldn't cache bucket settings" // Warn in ../../api/layer/cache.go
@ -137,4 +136,7 @@ const (
ControlAPIGetPolicy = "get policy request"
ControlAPIListPolicies = "list policies request"
PolicyValidationFailed = "policy validation failed"
ParseTreeNode = "parse tree node"
FailedToGetRealObjectSize = "failed to get real object size"
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
)

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sort"
"strconv"
"strings"
@ -16,6 +17,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type (
@ -29,12 +31,17 @@ type (
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error
RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error
}
SubTreeStream interface {
Next() (NodeResponse, error)
}
treeNode struct {
ID uint64
ParentID uint64
@ -191,37 +198,34 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
ID: treeNode.ID,
ParenID: treeNode.ParentID,
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp,
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
ID: treeNode.ID,
ParenID: treeNode.ParentID,
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp,
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
IsDeleteMarker: isDeleteMarker,
},
IsUnversioned: isUnversioned,
IsCombined: isCombined,
}
if isDeleteMarker {
var created time.Time
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
created = time.UnixMilli(utcMilli)
}
}
var owner user.ID
if ownerStr, ok := treeNode.Get(ownerKV); ok {
_ = owner.DecodeString(ownerStr)
}
version.DeleteMarker = &data.DeleteMarkerInfo{
Created: created,
Owner: owner,
if createdStr, ok := treeNode.Get(createdKV); ok {
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
created := time.UnixMilli(utcMilli)
version.Created = &created
}
}
if ownerStr, ok := treeNode.Get(ownerKV); ok {
var owner user.ID
if err := owner.DecodeString(ownerStr); err == nil {
version.Owner = &owner
}
}
return version
}
@ -635,8 +639,217 @@ func pathFromName(objectName string) []string {
return strings.Split(objectName, separator)
}
func (c *Tree) GetLatestVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, true)
type DummySubTreeStream struct {
data NodeResponse
read bool
}
func (s *DummySubTreeStream) Next() (NodeResponse, error) {
if s.read {
return nil, io.EOF
}
s.read = true
return s.data, nil
}
type VersionsByPrefixStreamImpl struct {
ctx context.Context
rootID uint64
intermediateRootID uint64
service ServiceClient
bktInfo *data.BucketInfo
mainStream SubTreeStream
innerStream SubTreeStream
headPrefix string
tailPrefix string
namesMap map[uint64]string
ended bool
latestOnly bool
currentLatest *data.NodeVersion
log *zap.Logger
}
func (s *VersionsByPrefixStreamImpl) Next(context.Context) (*data.NodeVersion, error) {
if s.ended {
return nil, io.EOF
}
for {
if s.innerStream == nil {
node, err := s.getNodeFromMainStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.ended = true
if s.currentLatest != nil {
return s.currentLatest, nil
}
}
return nil, fmt.Errorf("get node from main stream: %w", err)
}
if err = s.initInnerStream(node); err != nil {
return nil, fmt.Errorf("init inner stream: %w", err)
}
}
nodeVersion, err := s.getNodeVersionFromInnerStream()
if err != nil {
if errors.Is(err, io.EOF) {
s.innerStream = nil
maps.Clear(s.namesMap)
if s.currentLatest != nil && s.currentLatest.ID != s.intermediateRootID {
return s.currentLatest, nil
}
continue
}
return nil, fmt.Errorf("inner stream: %w", err)
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) getNodeFromMainStream() (NodeResponse, error) {
for {
node, err := s.mainStream.Next()
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, io.EOF
}
return nil, fmt.Errorf("main stream next: %w", err)
}
if node.GetNodeID() != s.rootID && strings.HasPrefix(getFilename(node), s.tailPrefix) {
return node, nil
}
}
}
func (s *VersionsByPrefixStreamImpl) initInnerStream(node NodeResponse) (err error) {
if node.GetParentID() == s.rootID {
s.intermediateRootID = node.GetNodeID()
}
if isIntermediate(node) {
s.innerStream, err = s.service.GetSubTreeStream(s.ctx, s.bktInfo, versionTree, node.GetNodeID(), maxGetSubTreeDepth)
if err != nil {
return fmt.Errorf("get sub tree node from main stream: %w", err)
}
} else {
s.innerStream = &DummySubTreeStream{data: node}
}
return nil
}
func (s *VersionsByPrefixStreamImpl) getNodeVersionFromInnerStream() (*data.NodeVersion, error) {
for {
node, err := s.innerStream.Next()
if err != nil {
return nil, fmt.Errorf("inner stream: %w", err)
}
nodeVersion, skip, err := s.parseNodeResponse(node)
if err != nil {
return nil, err
}
if skip {
continue
}
if s.latestOnly {
if s.currentLatest == nil {
s.currentLatest = nodeVersion
continue
}
if s.currentLatest.FilePath != nodeVersion.FilePath {
res := s.currentLatest
s.currentLatest = nodeVersion
return res, nil
}
if s.currentLatest.Timestamp < nodeVersion.Timestamp {
s.currentLatest = nodeVersion
}
continue
}
return nodeVersion, nil
}
}
func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *data.NodeVersion, skip bool, err error) {
trNode, fileName, err := parseTreeNode(node)
if err != nil {
s.log.Debug(logs.ParseTreeNode, zap.Error(err))
return nil, true, nil
}
var parentPrefix string
if s.headPrefix != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(s.headPrefix, separator) + separator // To avoid 'foo//bar'
}
var filepath string
if trNode.ID != s.intermediateRootID {
if filepath, err = formFilePath(node, fileName, s.namesMap); err != nil {
return nil, false, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
s.namesMap[trNode.ID] = filepath
}
if trNode.ObjID.Equals(oid.ID{}) { // The node can be intermediate, but we still want to update namesMap
return nil, true, nil
}
return newNodeVersionFromTreeNode(filepath, trNode), false, nil
}
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
mainStream, tailPrefix, rootID, err := c.getSubTreeByPrefixMainStream(ctx, bktInfo, versionTree, prefix)
if err != nil {
if errors.Is(err, io.EOF) {
return &VersionsByPrefixStreamImpl{ended: true}, nil
}
return nil, err
}
return &VersionsByPrefixStreamImpl{
ctx: ctx,
namesMap: map[uint64]string{},
rootID: rootID,
service: c.service,
bktInfo: bktInfo,
mainStream: mainStream,
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
tailPrefix: tailPrefix,
latestOnly: latestOnly,
log: c.log,
}, nil
}
func (c *Tree) getSubTreeByPrefixMainStream(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (SubTreeStream, string, uint64, error) {
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, treeID, prefix)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", 0, io.EOF
}
return nil, "", 0, err
}
subTree, err := c.service.GetSubTreeStream(ctx, bktInfo, treeID, rootID, 2)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", 0, io.EOF
}
return nil, "", 0, err
}
return subTree, tailPrefix, rootID, nil
}
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) (uint64, string, error) {
@ -757,65 +970,6 @@ func isIntermediate(node NodeResponse) bool {
return node.GetMeta()[0].GetKey() == FileNameKey
}
func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string, latestOnly bool) ([]*data.NodeVersion, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, nodeID, maxGetSubTreeDepth)
if err != nil {
return nil, err
}
var parentPrefix string
if parentFilePath != "" { // The root of subTree can also have a parent
parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
}
var emptyOID oid.ID
var filepath string
namesMap := make(map[uint64]string, len(subTree))
versions := make(map[string][]*data.NodeVersion, len(subTree))
for i, node := range subTree {
treeNode, fileName, err := parseTreeNode(node)
if err != nil {
continue
}
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
namesMap[treeNode.ID] = filepath
}
if treeNode.ObjID.Equals(emptyOID) { // The node can be intermediate but we still want to update namesMap
continue
}
key := formLatestNodeKey(node.GetParentID(), fileName)
versionNodes, ok := versions[key]
if !ok {
versionNodes = []*data.NodeVersion{newNodeVersionFromTreeNode(filepath, treeNode)}
} else if !latestOnly {
versionNodes = append(versionNodes, newNodeVersionFromTreeNode(filepath, treeNode))
} else if versionNodes[0].Timestamp <= treeNode.TimeStamp {
versionNodes[0] = newNodeVersionFromTreeNode(filepath, treeNode)
}
versions[key] = versionNodes
}
result := make([]*data.NodeVersion, 0, len(versions)) // consider use len(subTree)
for _, version := range versions {
if latestOnly && version[0].IsDeleteMarker() {
continue
}
result = append(result, version...)
}
return result, nil
}
func formFilePath(node NodeResponse, fileName string, namesMap map[uint64]string) (string, error) {
parentPath, ok := namesMap[node.GetParentID()]
if !ok {
@ -846,28 +1000,6 @@ func formLatestNodeKey(parentID uint64, fileName string) string {
return strconv.FormatUint(parentID, 10) + "." + fileName
}
func (c *Tree) GetAllVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.NodeVersion, error) {
return c.getVersionsByPrefix(ctx, bktInfo, prefix, false)
}
func (c *Tree) getVersionsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]*data.NodeVersion, error) {
prefixNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, versionTree, prefix, latestOnly)
if err != nil {
return nil, err
}
var result []*data.NodeVersion
for _, node := range prefixNodes {
versions, err := c.getSubTreeVersions(ctx, bktInfo, node.GetNodeID(), headPrefix, latestOnly)
if err != nil {
return nil, err
}
result = append(result, versions...)
}
return result, nil
}
func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, filepath string) (*data.NodeVersion, error) {
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
}
@ -1156,6 +1288,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
FileNameKey: path[len(path)-1],
ownerKV: version.Owner.EncodeToString(),
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
}
if version.Size > 0 {
@ -1168,10 +1302,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker() {
if version.IsDeleteMarker {
meta[isDeleteMarkerKV] = "true"
meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString()
meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10)
}
if version.IsCombined {

View file

@ -3,6 +3,7 @@ package tree
import (
"context"
"fmt"
"io"
"sort"
"time"
@ -222,14 +223,58 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket
return nil, ErrNodeNotFound
}
sortNode(tr.treeData)
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
}
type SubTreeStreamMemoryImpl struct {
res []NodeResponse
offset int
err error
}
func (s *SubTreeStreamMemoryImpl) Next() (NodeResponse, error) {
if s.err != nil {
return nil, s.err
}
if s.offset > len(s.res)-1 {
return nil, io.EOF
}
s.offset++
return s.res[s.offset-1], nil
}
func (c *ServiceClientMemory) GetSubTreeStream(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (SubTreeStream, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return &SubTreeStreamMemoryImpl{err: ErrNodeNotFound}, nil
}
tr, ok := cnr.trees[treeID]
if !ok {
return nil, ErrNodeNotFound
}
node := tr.treeData.getNode(rootID)
if node == nil {
return nil, ErrNodeNotFound
}
sortNode(tr.treeData)
return &SubTreeStreamMemoryImpl{
res: node.listNodes(nil, depth-1),
offset: 0,
}, nil
}
func newContainerInfo(bktInfo *data.BucketInfo, treeID string) containerInfo {
return containerInfo{
bkt: bktInfo,
@ -257,7 +302,11 @@ func newMemoryTree() memoryTree {
}
}
func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
func (c *ServiceClientMemory) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
return c.AddNodeBase(ctx, bktInfo, treeID, parent, meta, true)
}
func (c *ServiceClientMemory) AddNodeBase(_ context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string, needSort bool) (uint64, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
cnr = newContainerInfo(bktInfo, treeID)
@ -289,6 +338,9 @@ func (c *ServiceClientMemory) AddNode(_ context.Context, bktInfo *data.BucketInf
}
parentNode.children = append(parentNode.children, tn)
if needSort {
sortNodes(parentNode.children)
}
cnr.trees[treeID] = tr
return newID, nil
@ -361,6 +413,24 @@ func (c *ServiceClientMemory) MoveNode(_ context.Context, bktInfo *data.BucketIn
return nil
}
func sortNode(node *treeNodeMemory) {
if node == nil {
return
}
sortNodes(node.children)
for _, child := range node.children {
sortNode(child)
}
}
func sortNodes(list []*treeNodeMemory) {
sort.Slice(list, func(i, j int) bool {
return list[i].data.getValue(FileNameKey) < list[j].data.getValue(FileNameKey)
})
}
func (c *ServiceClientMemory) RemoveNode(_ context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {

View file

@ -3,10 +3,12 @@ package tree
import (
"context"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -141,12 +143,15 @@ func TestTreeServiceAddVersion(t *testing.T) {
CID: cidtest.ID(),
}
now := time.Now()
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: oidtest.ID(),
Size: 10,
ETag: "etag",
FilePath: "path/to/version",
Owner: usertest.ID(),
Created: &now,
},
IsUnversioned: true,
}