From 037e972424a2a8437d91a6942704f2817dad393f Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Fri, 22 Nov 2024 12:32:35 +0300 Subject: [PATCH] [#559] Remove multipart objects using tombstones Signed-off-by: Marina Biryukova --- api/handler/handlers_test.go | 7 +++ api/layer/frostfs/frostfs.go | 7 +++ api/layer/frostfs_mock.go | 66 ++++++++++++++++++++- api/layer/layer.go | 108 ++++++++++++++++++++++++++++------ api/layer/multipart_upload.go | 39 ++++++++++-- cmd/s3-gw/app.go | 30 ++++++++++ cmd/s3-gw/app_settings.go | 39 +++++++++++- config/config.env | 6 ++ config/config.yaml | 7 +++ docs/configuration.md | 7 +++ internal/frostfs/frostfs.go | 6 ++ internal/logs/logs.go | 3 + 12 files changed, 302 insertions(+), 23 deletions(-) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 628bba7..e5a224b 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -32,6 +32,7 @@ import ( "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -184,6 +185,11 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas features := &layer.FeatureSettingsMock{} + pool, err := ants.NewPool(1) + if err != nil { + return nil, err + } + layerCfg := &layer.Config{ Cache: layer.NewCache(cacheCfg), AnonKey: layer.AnonymousKey{Key: key}, @@ -191,6 +197,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas TreeService: treeMock, Features: features, GateOwner: owner, + WorkerPool: pool, } var pp netmap.PlacementPolicy diff --git a/api/layer/frostfs/frostfs.go b/api/layer/frostfs/frostfs.go index f813d0b..c51e58b 100644 --- a/api/layer/frostfs/frostfs.go +++ b/api/layer/frostfs/frostfs.go @@ -13,6 +13,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" @@ -170,6 +171,9 @@ type PrmObjectCreate struct { // Sets max buffer size to read payload. BufferMaxSize uint64 + + // Object type (optional). + Type object.Type } // CreateObjectResult is a result parameter of FrostFS.CreateObject operation. @@ -344,4 +348,7 @@ type FrostFS interface { // NetworkInfo returns parameters of FrostFS network. NetworkInfo(context.Context) (netmap.NetworkInfo, error) + + // Relations returns implementation of relations.Relations interface. + Relations() relations.Relations } diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index 1c86d24..4b316b2 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -24,6 +24,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" @@ -35,6 +36,14 @@ type FeatureSettingsMock struct { md5Enabled bool } +func (k *FeatureSettingsMock) TombstoneLifetime() uint64 { + return 1 +} + +func (k *FeatureSettingsMock) TombstoneMembersSize() int { + return 2 +} + func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 { return 0 } @@ -262,7 +271,36 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRang return io.NopCloser(bytes.NewReader(payload)), nil } -func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) { +func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) { + if prm.Type == object.TypeTombstone { + payload, err := io.ReadAll(prm.Payload) + if err != nil { + return nil, err + } + + var tomb object.Tombstone + err = tomb.Unmarshal(payload) + if err != nil { + return nil, err + } + + for _, objID := range tomb.Members() { + prmDelete := frostfs.PrmObjectDelete{ + PrmAuth: prm.PrmAuth, + Container: prm.Container, + Object: objID, + } + + if err = t.DeleteObject(ctx, prmDelete); err != nil { + return nil, err + } + } + + return &frostfs.CreateObjectResult{ + CreationEpoch: t.currentEpoch, + }, nil + } + b := make([]byte, 32) if _, err := io.ReadFull(rand.Reader, b); err != nil { return nil, err @@ -459,6 +497,10 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc return newID, nil } +func (t *TestFrostFS) Relations() relations.Relations { + return &RelationsMock{} +} + func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error { list, ok := t.chains[prm.ContainerID.EncodeToString()] if !ok { @@ -499,3 +541,25 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { } return false } + +type RelationsMock struct{} + +func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) { + return nil, relations.ErrNoSplitInfo +} + +func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) { + return nil, nil +} + +func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) { + return oid.ID{}, nil +} + +func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) { + return nil, nil +} + +func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) { + return nil, nil +} diff --git a/api/layer/layer.go b/api/layer/layer.go index 12d064c..afd934f 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -1,6 +1,7 @@ package layer import ( + "bytes" "context" "crypto/ecdsa" "crypto/rand" @@ -13,8 +14,10 @@ import ( "sort" "strconv" "strings" + "sync" "time" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" @@ -27,10 +30,13 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -44,6 +50,8 @@ type ( BufferMaxSizeForPut() uint64 MD5Enabled() bool FormContainerZone(ns string) string + TombstoneMembersSize() int + TombstoneLifetime() uint64 } Layer struct { @@ -58,6 +66,7 @@ type ( gateKey *keys.PrivateKey corsCnrInfo *data.BucketInfo lifecycleCnrInfo *data.BucketInfo + workerPool *ants.Pool } Config struct { @@ -71,6 +80,7 @@ type ( GateKey *keys.PrivateKey CORSCnrInfo *data.BucketInfo LifecycleCnrInfo *data.BucketInfo + WorkerPool *ants.Pool } // AnonymousKey contains data for anonymous requests. @@ -249,6 +259,7 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer { gateKey: config.GateKey, corsCnrInfo: config.CORSCnrInfo, lifecycleCnrInfo: config.LifecycleCnrInfo, + workerPool: config.WorkerPool, } } @@ -557,7 +568,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } for _, nodeVersion := range nodeVersions { - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { + if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { return obj } @@ -596,7 +607,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } if !nodeVersion.IsDeleteMarker { - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { + if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { return obj } @@ -732,19 +743,19 @@ func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob return n.getNodeVersion(ctx, objVersion) } -func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { +func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject, networkInfo netmap.NetworkInfo) (string, error) { if nodeVersion.IsDeleteMarker { return obj.VersionID, nil } if nodeVersion.IsCombined { - return "", n.removeCombinedObject(ctx, bkt, nodeVersion) + return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo) } return "", n.objectDelete(ctx, bkt, nodeVersion.OID) } -func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error { +func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error { combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) if err != nil { return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) @@ -755,20 +766,83 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, return fmt.Errorf("unmarshal combined object parts: %w", err) } - for _, part := range parts { - if err = n.objectDelete(ctx, bkt, part.OID); err == nil { - continue - } - - if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { - return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err) - } - - n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()), - zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err)) + tokens := prepareTokensParameter(ctx, bkt.Owner) + oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens) + if err != nil { + n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), + zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err)) } - return n.objectDelete(ctx, bkt, nodeVersion.OID) + members := append(oids, nodeVersion.OID) + for _, part := range parts { + oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens) + if err != nil { + n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), + zap.String("oid", part.OID.EncodeToString()), zap.Error(err)) + } + + members = append(members, append(oids, part.OID)...) + } + + n.putTombstones(ctx, bkt, networkInfo, members) + return nil +} + +func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) { + var wg sync.WaitGroup + tombstoneMembersSize := n.features.TombstoneMembersSize() + tombstoneLifetime := n.features.TombstoneLifetime() + + for i := 0; i < len(members); i += tombstoneMembersSize { + end := tombstoneMembersSize * (i + 1) + if end > len(members) { + end = len(members) + } + n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg) + } + + wg.Wait() +} + +func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) { + tomb := object.NewTombstone() + tomb.SetExpirationEpoch(expEpoch) + tomb.SetMembers(members) + + wg.Add(1) + err := n.workerPool.Submit(func() { + defer wg.Done() + + if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil { + n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err)) + } + }) + if err != nil { + wg.Done() + n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + } +} + +func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error { + payload, err := tomb.Marshal() + if err != nil { + return fmt.Errorf("marshal tombstone: %w", err) + } + + prm := frostfs.PrmObjectCreate{ + Container: bktInfo.CID, + Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}}, + Payload: bytes.NewReader(payload), + CreationTime: TimeNow(ctx), + ClientCut: n.features.ClientCut(), + WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled, + BufferMaxSize: n.features.BufferMaxSizeForPut(), + Type: object.TypeTombstone, + } + n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) + + _, err = n.frostFS.CreateObject(ctx, prm) + return err } // DeleteObjects from the storage. diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index ed7612c..b05afde 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -21,8 +21,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/minio/sio" "go.uber.org/zap" @@ -559,16 +563,43 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e return err } + networkInfo, err := n.GetNetworkInfo(ctx) + if err != nil { + return fmt.Errorf("get network info: %w", err) + } + + n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo) + + return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) +} + +func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) { + members := make([]oid.ID, 0) + tokens := prepareTokensParameter(ctx, bkt.Owner) for _, infos := range parts { for _, info := range infos { - if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { - n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), - zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) + oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens) + if err != nil { + n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), + zap.String("oid", info.OID.EncodeToString()), zap.Error(err)) } + members = append(members, append(oids, info.OID)...) } } - return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) + n.putTombstones(ctx, bkt, networkInfo, members) +} + +func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens { + tokens := relations.Tokens{} + + if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil { + if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) { + tokens.Bearer = bd.Gate.BearerToken + } + } + + return tokens } func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 1ac2b30..599f7a9 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -50,6 +50,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/go-chi/chi/v5/middleware" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -99,6 +100,7 @@ type ( frostfsidValidation bool accessbox *cid.ID dialerSource *internalnet.DialerSource + workerPoolSize int mu sync.RWMutex namespaces Namespaces @@ -119,6 +121,8 @@ type ( vhsNamespacesEnabled map[string]bool retryMaxBackoff time.Duration retryStrategy handler.RetryStrategy + tombstoneMembersSize int + tombstoneLifetime uint64 } maxClientsConfig struct { @@ -224,12 +228,21 @@ func (a *App) initLayer(ctx context.Context) { GateKey: a.key, CORSCnrInfo: corsCnrInfo, LifecycleCnrInfo: lifecycleCnrInfo, + WorkerPool: a.initWorkerPool(), } // prepare object layer a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) } +func (a *App) initWorkerPool() *ants.Pool { + workerPool, err := ants.NewPool(a.settings.workerPoolSize) + if err != nil { + a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err)) + } + return workerPool +} + func newAppSettings(log *Logger, v *viper.Viper) *appSettings { settings := &appSettings{ logLevel: log.lvl, @@ -239,6 +252,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings { reconnectInterval: fetchReconnectInterval(v), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), dialerSource: getDialerSource(log.logger, v), + workerPoolSize: fetchTombstoneWorkerPoolSize(v), } settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) @@ -275,6 +289,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize) httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination) httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip) + tombstoneMembersSize := fetchTombstoneMembersSize(v) + tombstoneLifetime := fetchTombstoneLifetime(v) s.mu.Lock() defer s.mu.Unlock() @@ -304,6 +320,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { s.vhsHeader = vhsHeader s.servernameHeader = servernameHeader s.vhsNamespacesEnabled = vhsNamespacesEnabled + s.tombstoneMembersSize = tombstoneMembersSize + s.tombstoneLifetime = tombstoneLifetime } func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool { @@ -506,6 +524,18 @@ func (s *appSettings) AccessBoxContainer() (cid.ID, bool) { return cid.ID{}, false } +func (s *appSettings) TombstoneMembersSize() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.tombstoneMembersSize +} + +func (s *appSettings) TombstoneLifetime() uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + return s.tombstoneLifetime +} + func (a *App) initAPI(ctx context.Context) { a.initLayer(ctx) a.initHandler() diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 691e231..4ad77bb 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -74,6 +74,10 @@ const ( defaultRetryMaxAttempts = 4 defaultRetryMaxBackoff = 30 * time.Second defaultRetryStrategy = handler.RetryStrategyExponential + + defaultTombstoneLifetime = uint64(10) + defaultTombstoneMembersSize = 100 + defaultTombstoneWorkerPoolSize = 100 ) var ( @@ -239,7 +243,10 @@ const ( // Settings. // Sets max buffer size for read payload in put operations. cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" // Sets max attempt to make successful tree request. - cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" + cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" + cfgTombstoneLifetime = "frostfs.tombstone.lifetime" + cfgTombstoneMembersSize = "frostfs.tombstone.members_size" + cfgTombstoneWorkerPoolSize = "frostfs.tombstone.worker_pool_size" // Specifies the timeout after which unhealthy client be closed during rebalancing // if it will become healthy back. @@ -804,6 +811,33 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) { return attributes, nil } +func fetchTombstoneLifetime(v *viper.Viper) uint64 { + tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime) + if tombstoneLifetime <= 0 { + tombstoneLifetime = defaultTombstoneLifetime + } + + return tombstoneLifetime +} + +func fetchTombstoneMembersSize(v *viper.Viper) int { + tombstoneMembersSize := v.GetInt(cfgTombstoneMembersSize) + if tombstoneMembersSize <= 0 { + tombstoneMembersSize = defaultTombstoneMembersSize + } + + return tombstoneMembersSize +} + +func fetchTombstoneWorkerPoolSize(v *viper.Viper) int { + tombstoneWorkerPoolSize := v.GetInt(cfgTombstoneWorkerPoolSize) + if tombstoneWorkerPoolSize <= 0 { + tombstoneWorkerPoolSize = defaultTombstoneWorkerPoolSize + } + + return tombstoneWorkerPoolSize +} + func newSettings() *viper.Viper { v := viper.New() @@ -876,6 +910,9 @@ func newSettings() *viper.Viper { // frostfs v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb + v.SetDefault(cfgTombstoneLifetime, defaultTombstoneLifetime) + v.SetDefault(cfgTombstoneMembersSize, defaultTombstoneMembersSize) + v.SetDefault(cfgTombstoneWorkerPoolSize, defaultTombstoneWorkerPoolSize) // kludge v.SetDefault(cfgKludgeUseDefaultXMLNS, false) diff --git a/config/config.env b/config/config.env index ef6a27b..ba14e56 100644 --- a/config/config.env +++ b/config/config.env @@ -163,6 +163,12 @@ S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576 S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0 # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s +# Tombstone's lifetime in epochs. +S3_GW_FROSTFS_TOMBSTONE_LIFETIME=10 +# Maximum number of object IDs in one tombstone. +S3_GW_FROSTFS_TOMBSTONE_MEMBERS_COUNT=100 +# Maximum worker count in layer's worker pool that create tombstones. +S3_GW_FROSTFS_TOMBSTONE_WORKER_POOL_SIZE=100 # List of allowed AccessKeyID prefixes # If not set, S3 GW will accept all AccessKeyIDs diff --git a/config/config.yaml b/config/config.yaml index 051f5f7..82f0834 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -199,6 +199,13 @@ frostfs: buffer_max_size_for_put: 1048576 # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. graceful_close_on_switch_timeout: 10s + tombstone: + # Tombstone's lifetime in epochs. + lifetime: 10 + # Maximum number of object IDs in one tombstone. + members_count: 100 + # Maximum worker count in layer's worker pool that create tombstones. + worker_pool_size: 100 # List of allowed AccessKeyID prefixes # If the parameter is omitted, S3 GW will accept all AccessKeyIDs diff --git a/docs/configuration.md b/docs/configuration.md index 70b2f33..36ed5d8 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -588,6 +588,10 @@ frostfs: buffer_max_size_for_put: 1048576 # 1mb tree_pool_max_attempts: 0 graceful_close_on_switch_timeout: 10s + tombstone: + lifetime: 10 + members_count: 100 + worker_pool_size: 100 ``` | Parameter | Type | SIGHUP reload | Default value | Description | @@ -597,6 +601,9 @@ frostfs: | `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. | | `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. | | `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. | +| `tombstone.lifetime` | `uint64` | yes | 10 | Tombstone's lifetime in epochs. | +| `tombstone.members_count` | `int` | yes | 100 | Maximum number of object IDs in one tombstone. | +| `tombstone.worker_pool_size` | `int` | no | 100 | Maximum worker count in layer's worker pool that create tombstones. | # `resolve_bucket` section diff --git a/internal/frostfs/frostfs.go b/internal/frostfs/frostfs.go index 24de01a..9297ac1 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/frostfs/frostfs.go @@ -19,6 +19,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" @@ -214,6 +215,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) obj.SetOwnerID(x.owner) obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) + obj.SetType(prm.Type) if prm.BearerToken == nil && prm.PrivateKey != nil { var owner user.ID @@ -438,6 +440,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) ( return res.ObjectID, nil } +func (x *FrostFS) Relations() relations.Relations { + return x.pool +} + // ResolverFrostFS represents virtual connection to the FrostFS network. // It implements resolver.FrostFS. type ResolverFrostFS struct { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 3da1255..a784b2c 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -177,4 +177,7 @@ const ( MultinetConfigWontBeUpdated = "multinet config won't be updated" MultinetDialSuccess = "multinet dial successful" MultinetDialFail = "multinet dial failed" + FailedToPutTombstoneObject = "failed to put tombstone object" + FailedToCreateWorkerPool = "failed to create worker pool" + FailedToListAllObjectRelations = "failed to list all object relations" )