diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index ed0c992b4..8e44a72f8 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" @@ -471,6 +472,27 @@ func TestDeleteBucketByNotOwner(t *testing.T) { deleteBucket(t, hc, bktName, http.StatusNoContent) } +func TestRemovalOnReplace(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName, objName := "bucket", "object" + bktInfo := createTestBucket(hc, bktName) + + putObject(hc, bktName, objName) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 1) + + putObject(hc, bktName, objName) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2) + + hc.layerFeatures.SetRemoveOnReplace(true) + + putObject(hc, bktName, objName) + + time.Sleep(time.Second) + + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2) +} + func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) { bktInfo := createTestBucket(tc, bktName) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 0e66df5d9..f23b58fa1 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -204,6 +204,8 @@ func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext { } func prepareHandlerContextBase(config *handlerConfig, log *zap.Logger) (*handlerContextBase, error) { + ctx := context.Background() + key, err := keys.NewPrivateKey() if err != nil { return nil, err @@ -263,7 +265,7 @@ func prepareHandlerContextBase(config *handlerConfig, log *zap.Logger) (*handler } h := &handler{ log: log, - obj: layer.NewLayer(log, tp, layerCfg), + obj: layer.NewLayer(ctx, log, tp, layerCfg), cfg: cfg, ape: newAPEMock(), frostfsid: newFrostfsIDMock(), @@ -279,7 +281,7 @@ func prepareHandlerContextBase(config *handlerConfig, log *zap.Logger) (*handler h: h, tp: tp, tree: treeMock, - context: middleware.SetBox(context.Background(), &middleware.Box{AccessBox: accessBox}), + context: middleware.SetBox(ctx, &middleware.Box{AccessBox: accessBox}), config: cfg, layerFeatures: features, diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index 4726d21d4..735b2b691 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -33,8 +33,9 @@ import ( ) type FeatureSettingsMock struct { - clientCut bool - md5Enabled bool + clientCut bool + md5Enabled bool + removeOnReplace bool } func (k *FeatureSettingsMock) TombstoneLifetime() uint64 { @@ -73,6 +74,22 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string { return ns + ".ns" } +func (k *FeatureSettingsMock) SetRemoveOnReplace(removeOnReplace bool) { + k.removeOnReplace = removeOnReplace +} + +func (k *FeatureSettingsMock) RemoveOnReplace() bool { + return k.removeOnReplace +} + +func (k *FeatureSettingsMock) RemoveOnReplaceTimeout() time.Duration { + return time.Minute +} + +func (k *FeatureSettingsMock) RemoveOnReplaceQueue() int { + return 1 +} + var _ frostfs.FrostFS = (*TestFrostFS)(nil) type offsetError struct { diff --git a/api/layer/layer.go b/api/layer/layer.go index b658d51e4..b40758fd0 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -48,6 +48,9 @@ type ( FormContainerZone(ns string) string TombstoneMembersSize() int TombstoneLifetime() uint64 + RemoveOnReplace() bool + RemoveOnReplaceTimeout() time.Duration + RemoveOnReplaceQueue() int } Layer struct { @@ -63,6 +66,15 @@ type ( corsCnrInfo *data.BucketInfo lifecycleCnrInfo *data.BucketInfo workerPool *ants.Pool + removalChan chan removalParams + } + + removalParams struct { + Auth frostfs.PrmAuth + BktInfo *data.BucketInfo + OIDs []oid.ID + RequestID string + TraceID string } Config struct { @@ -256,8 +268,8 @@ func (p HeadObjectParams) Versioned() bool { // NewLayer creates an instance of a Layer. It checks credentials // and establishes gRPC connection with the node. -func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer { - return &Layer{ +func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer { + layer := &Layer{ frostFS: frostFS, log: log, gateOwner: config.GateOwner, @@ -270,7 +282,13 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer { corsCnrInfo: config.CORSCnrInfo, lifecycleCnrInfo: config.LifecycleCnrInfo, workerPool: config.WorkerPool, + // TODO: consider closing channel + removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()), } + + go layer.removalRoutine(ctx) + + return layer } func (n *Layer) EphemeralKey() *keys.PublicKey { @@ -695,7 +713,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings IsUnversioned: settings.VersioningSuspended(), } - if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil { + if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil { return obj } @@ -704,6 +722,67 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings return obj } +func (n *Layer) removalRoutine(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case prm, ok := <-n.removalChan: + if !ok { + return + } + + reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout()) + for _, objID := range prm.OIDs { + if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil { + n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID), + zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage)) + } + } + cancel() + } + } +} + +func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) { + if !n.features.RemoveOnReplace() { + return + } + + reqInfo := middleware.GetReqInfo(ctx) + prm := removalParams{ + Auth: frostfs.PrmAuth{}, + BktInfo: bktInfo, + OIDs: OIDs, + RequestID: reqInfo.RequestID, + TraceID: reqInfo.TraceID, + } + + n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner) + + select { + case n.removalChan <- prm: + default: + oidsStr := make([]string, len(OIDs)) + for i, d := range OIDs { + oidsStr[i] = d.EncodeToString() + } + + n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete, + zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath)) + } +} + +func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) { + nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version) + n.tryRemove(ctx, bktInfo, OIDs) + if err != nil { + return nodeID, err + } + + return nodeID, nil +} + func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { if isNotFoundError(obj.Error) { obj.Error = nil diff --git a/api/layer/object.go b/api/layer/object.go index eb091d62d..35122bec3 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -354,7 +354,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend newVersion.Size = createdObj.Size } - if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { + if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil { return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) } diff --git a/api/layer/patch.go b/api/layer/patch.go index def74c1c8..caa79f748 100644 --- a/api/layer/patch.go +++ b/api/layer/patch.go @@ -66,7 +66,7 @@ func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.Ex IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "", } - if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { + if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil { return nil, fmt.Errorf("couldn't add new version to tree service: %w", err) } @@ -253,7 +253,7 @@ func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "", } - if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { + if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil { return nil, fmt.Errorf("couldn't add new version to tree service: %w", err) } diff --git a/api/layer/tree/tree_service.go b/api/layer/tree/tree_service.go index f3b6f6e3c..1f4a9ccd5 100644 --- a/api/layer/tree/tree_service.go +++ b/api/layer/tree/tree_service.go @@ -45,7 +45,11 @@ type Service interface { GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) - AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) + + // AddVersion creates new version in tree. + // Returns new node id and object ids of old versions (OIDS) that must be deleted. + // OIDs can be returned even if error is not nil. + AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index ae1aa0acc..146411c87 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -240,19 +240,19 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket return nil, tree.ErrNodeNotFound } -func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) { +func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{ newVersion.FilePath: {newVersion}, } - return newVersion.ID, nil + return newVersion.ID, nil, nil } versions, ok := cnrVersionsMap[newVersion.FilePath] if !ok { cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion} - return newVersion.ID, nil + return newVersion.ID, nil, nil } sort.Slice(versions, func(i, j int) bool { @@ -266,18 +266,22 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo result := versions + var oldUnversionedIDs []oid.ID + if newVersion.IsUnversioned { result = make([]*data.NodeVersion, 0, len(versions)) for _, node := range versions { if !node.IsUnversioned { result = append(result, node) + } else { + oldUnversionedIDs = append(oldUnversionedIDs, node.OID) } } } cnrVersionsMap[newVersion.FilePath] = append(result, newVersion) - return newVersion.ID, nil + return newVersion.ID, oldUnversionedIDs, nil } func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error { diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index e525eca3c..2247b8067 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -180,7 +180,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { return &testContext{ ctx: ctx, - layer: NewLayer(logger, tp, layerCfg), + layer: NewLayer(ctx, logger, tp, layerCfg), bktInfo: &data.BucketInfo{ Name: bktName, Owner: owner, diff --git a/api/middleware/reqinfo.go b/api/middleware/reqinfo.go index fe011807c..c61563c12 100644 --- a/api/middleware/reqinfo.go +++ b/api/middleware/reqinfo.go @@ -190,7 +190,9 @@ func Request(log *zap.Logger, settings RequestSettings) Func { fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)} ctx, span := StartHTTPServerSpan(r, "REQUEST S3") if traceID := span.SpanContext().TraceID(); traceID.IsValid() { - fields = append(fields, zap.String("trace_id", traceID.String())) + traceIDStr := traceID.String() + fields = append(fields, zap.String("trace_id", traceIDStr)) + reqInfo.TraceID = traceIDStr } lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 463eb52d7..37371c0d3 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -103,18 +103,19 @@ type ( } appSettings struct { - logLevel zap.AtomicLevel - httpLogging s3middleware.LogHTTPConfig - tagsConfig *tagsConfig - maxClient maxClientsConfig - defaultMaxAge int - reconnectInterval time.Duration - resolveZoneList []string - isResolveListAllow bool // True if ResolveZoneList contains allowed zones - frostfsidValidation bool - accessbox *cid.ID - dialerSource *internalnet.DialerSource - workerPoolSize int + logLevel zap.AtomicLevel + httpLogging s3middleware.LogHTTPConfig + tagsConfig *tagsConfig + maxClient maxClientsConfig + defaultMaxAge int + reconnectInterval time.Duration + resolveZoneList []string + isResolveListAllow bool // True if ResolveZoneList contains allowed zones + frostfsidValidation bool + accessbox *cid.ID + dialerSource *internalnet.DialerSource + workerPoolSize int + removeOnReplaceQueue int mu sync.RWMutex namespaces Namespaces @@ -140,6 +141,8 @@ type ( tombstoneLifetime uint64 tlsTerminationHeader string listingKeepaliveThrottle time.Duration + removeOnReplace bool + removeOnReplaceTimeout time.Duration } maxClientsConfig struct { @@ -310,7 +313,7 @@ func (a *App) initLayer(ctx context.Context) { } // prepare object layer - a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) + a.obj = layer.NewLayer(ctx, a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) } func (a *App) initWorkerPool() *ants.Pool { @@ -323,15 +326,16 @@ func (a *App) initWorkerPool() *ants.Pool { func newAppSettings(log *Logger, v *viper.Viper) *appSettings { settings := &appSettings{ - logLevel: log.lvl, - httpLogging: s3middleware.LogHTTPConfig{}, - tagsConfig: newTagsConfig(v), - maxClient: newMaxClients(v), - defaultMaxAge: fetchDefaultMaxAge(v, log.logger), - reconnectInterval: fetchReconnectInterval(v), - frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), - dialerSource: getDialerSource(log.logger, v), - workerPoolSize: fetchTombstoneWorkerPoolSize(v), + logLevel: log.lvl, + httpLogging: s3middleware.LogHTTPConfig{}, + tagsConfig: newTagsConfig(v), + maxClient: newMaxClients(v), + defaultMaxAge: fetchDefaultMaxAge(v, log.logger), + reconnectInterval: fetchReconnectInterval(v), + frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), + dialerSource: getDialerSource(log.logger, v), + workerPoolSize: fetchTombstoneWorkerPoolSize(v), + removeOnReplaceQueue: fetchRemoveOnReplaceQueue(v), } settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) @@ -373,6 +377,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { tombstoneLifetime := fetchTombstoneLifetime(v) tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader) listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle) + removeOnReplace := v.GetBool(cfgRemoveOnReplaceEnabled) + removeOnReplaceTimeout := fetchRemoveOnReplaceTimeout(v) s.mu.Lock() defer s.mu.Unlock() @@ -407,6 +413,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { s.tombstoneLifetime = tombstoneLifetime s.tlsTerminationHeader = tlsTerminationHeader s.listingKeepaliveThrottle = listingKeepaliveThrottle + s.removeOnReplace = removeOnReplace + s.removeOnReplaceTimeout = removeOnReplaceTimeout } func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool { @@ -650,6 +658,24 @@ func (s *appSettings) ListingKeepaliveThrottle() time.Duration { return s.listingKeepaliveThrottle } +func (s *appSettings) RemoveOnReplace() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.removeOnReplace +} + +func (s *appSettings) RemoveOnReplaceTimeout() time.Duration { + s.mu.RLock() + defer s.mu.RUnlock() + return s.removeOnReplaceTimeout +} + +func (s *appSettings) RemoveOnReplaceQueue() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.removeOnReplaceQueue +} + func (a *App) initAPI(ctx context.Context) { a.initLayer(ctx) a.initHandler() diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 6afdb0860..765046298 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -78,6 +78,9 @@ const ( useDefaultXmlns = "use_default_xmlns" bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks" + + defaultRemoveOnReplaceTimeout = 30 * time.Second + defaultRemoveOnReplaceQueue = 10000 ) var ( @@ -276,9 +279,12 @@ const ( cfgSoftMemoryLimit = "runtime.soft_memory_limit" // Enable return MD5 checksum in ETag. - cfgMD5Enabled = "features.md5.enabled" - cfgPolicyDenyByDefault = "features.policy.deny_by_default" - cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support" + cfgMD5Enabled = "features.md5.enabled" + cfgPolicyDenyByDefault = "features.policy.deny_by_default" + cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support" + cfgRemoveOnReplaceEnabled = "features.remove_on_replace.enabled" + cfgRemoveOnReplaceTimeout = "features.remove_on_replace.timeout" + cfgRemoveOnReplaceQueue = "features.remove_on_replace.queue" // FrostfsID. cfgFrostfsIDContract = "frostfsid.contract" @@ -960,6 +966,24 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int { return tombstoneWorkerPoolSize } +func fetchRemoveOnReplaceTimeout(v *viper.Viper) time.Duration { + val := v.GetDuration(cfgRemoveOnReplaceTimeout) + if val <= 0 { + val = defaultRemoveOnReplaceTimeout + } + + return val +} + +func fetchRemoveOnReplaceQueue(v *viper.Viper) int { + val := v.GetInt(cfgRemoveOnReplaceQueue) + if val <= 0 { + val = defaultRemoveOnReplaceQueue + } + + return val +} + func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) { res := make(map[string]zapcore.Level) diff --git a/config/config.env b/config/config.env index 9b538f0c9..d759fdd81 100644 --- a/config/config.env +++ b/config/config.env @@ -222,6 +222,12 @@ S3_GW_FEATURES_MD5_ENABLED=false S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false # Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true +# Enable removing old object during PUT operation in unversioned/suspened bucket. +S3_GW_FEATURES_REMOVE_ON_REPLACE_ENABLED=false +# Timeout to one delete operation in background. +S3_GW_FEATURES_REMOVE_ON_REPLACE_TIMEOUT=30s +# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. +S3_GW_FEATURES_REMOVE_ON_REPLACE_QUEUE=10000 # ReadTimeout is the maximum duration for reading the entire # request, including the body. A zero or negative value means diff --git a/config/config.yaml b/config/config.yaml index cad714c43..f87ab0413 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -261,6 +261,13 @@ features: enabled: false # Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service tree_pool_netmap_support: true + remove_on_replace: + # Enable removing old object during PUT operation in unversioned/suspened bucket. + enabled: false + # Timeout to one delete operation in background. + timeout: 30s + # Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. + queue: 10000 web: # ReadTimeout is the maximum duration for reading the entire diff --git a/docs/configuration.md b/docs/configuration.md index 19327a67b..83776ed4e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -732,13 +732,20 @@ features: md5: enabled: false tree_pool_netmap_support: true + remove_on_replace: + enabled: false + timeout: 30s + queue: 10000 ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|----------------------------|--------|---------------|---------------|---------------------------------------------------------------------------------------------------------| -| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. | -| `policy.deny_by_default` | `bool` | yes | false | Enable denying access for request that doesn't match any policy chain rules. | -| `tree_pool_netmap_support` | `bool` | no | false | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|-----------------------------|-------------|---------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------| +| `md5.enabled` | `bool` | yes | `false` | Flag to enable return MD5 checksum in ETag headers and fields. | +| `policy.deny_by_default` | `bool` | yes | `false` | Enable denying access for request that doesn't match any policy chain rules. | +| `tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. | +| `remove_on_replace.enabled` | `bool` | yes | `false` | Enable removing old object during PUT operation in unversioned/suspened bucket. | +| `remove_on_replace.timeout` | `durations` | yes | `30s` | Timeout to one delete operation in background. | +| `remove_on_replace.queue` | `int` | false | `10000` | Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. | # `web` section Contains web server configuration parameters. diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c0db6fe33..c89661264 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -159,6 +159,7 @@ const ( ResolveBucket = "resolve bucket" FailedToResolveCID = "failed to resolve CID" FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" + FailedToQueueOldUnversionedObjectToDelete = "failed to queue old unversioned object to delete, removal will be performed in lifecycler" ) // External storage. @@ -179,6 +180,7 @@ const ( PutObject = "put object" CouldNotFetchObjectMeta = "could not fetch object meta" FailedToDeleteObject = "failed to delete object" + FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object" CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object" CouldntGetCORSObjectVersions = "couldn't get cors object versions" ) @@ -202,6 +204,9 @@ const ( BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids" GetAllBucketCorsFromTree = "get all bucket cors from tree" CouldntDeleteBucketCORS = "couldn't delete bucket cors" + BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids" + GetBucketCorsFromTree = "get bucket cors from tree" + FailedToRemoveOldUnversionedNode = "failed to remove old unversioned node" ) // Authmate. diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 2b09d9e15..f82809b77 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -1296,10 +1296,18 @@ func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, fil ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned") defer span.End() - return c.getUnversioned(ctx, bktInfo, versionTree, filepath) + res, err := c.getUnversioned(ctx, bktInfo, versionTree, filepath) + if err != nil { + return nil, err + } + + return res[0], err } -func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) { +// getUnversioned returns all unversioned nodes for specified filepath. +// List of node is always not empty if error is nil. +// First item of list is the latest (according timestamp). +func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) ([]*data.NodeVersion, error) { nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true) if err != nil { return nil, err @@ -1320,10 +1328,10 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre return nodes[i].Timestamp > nodes[j].Timestamp }) - return nodes[0], nil + return nodes, nil } -func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) { +func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) { ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion") defer span.End() @@ -1760,7 +1768,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket return getObjectTagging(nodes[isTagKV]), lockInfo, nil } -func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) { +func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) { path := pathFromName(version.FilePath) meta := map[string]string{ oidKV: version.OID.EncodeToString(), @@ -1791,25 +1799,40 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID if version.IsUnversioned { meta[isUnversionedKV] = "true" - node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath) + nodes, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath) if err == nil { + node := nodes[0] if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil { - return 0, err + return 0, nil, err } - return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID) + oldOIDs := make([]oid.ID, len(nodes)) + for i, oldNode := range nodes { + oldOIDs[i] = oldNode.OID + } + + return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes) } if !errors.Is(err, tree.ErrNodeNotFound) { - return 0, err + return 0, nil, err } } - return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) + nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) + return nodeID, nil, err } -func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { - taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV) +func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodes []*data.NodeVersion) error { + for _, node := range nodes[1:] { + if err := c.service.RemoveNode(ctx, bktInfo, treeID, node.ID); err != nil { + c.reqLogger(ctx).Warn(logs.FailedToRemoveOldUnversionedNode, zap.Uint64("node_id", node.ID), + zap.Error(err), logs.TagField(logs.TagExternalStorageTree)) + } + } + + latest := nodes[0] + taggingNode, err := c.getTreeNode(ctx, bktInfo, latest.ID, isTagKV) if err != nil { return err } diff --git a/pkg/service/tree/tree_test.go b/pkg/service/tree/tree_test.go index 5b7b66567..d04fd9092 100644 --- a/pkg/service/tree/tree_test.go +++ b/pkg/service/tree/tree_test.go @@ -166,7 +166,7 @@ func TestTreeServiceAddVersion(t *testing.T) { IsUnversioned: true, } - nodeID, err := treeService.AddVersion(ctx, bktInfo, version) + nodeID, _, err := treeService.AddVersion(ctx, bktInfo, version) require.NoError(t, err) storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version") @@ -404,7 +404,7 @@ func TestVersionsByPrefixStreamImpl_Next(t *testing.T) { } for _, v := range versions { - _, err = treeService.AddVersion(ctx, bktInfo, v) + _, _, err = treeService.AddVersion(ctx, bktInfo, v) require.NoError(t, err) }