Compare commits

..

1 commit

Author SHA1 Message Date
a215c96494 [#653] Support removal old unversioned objects
All checks were successful
/ DCO (pull_request) Successful in 54s
/ Vulncheck (pull_request) Successful in 1m4s
/ Builds (pull_request) Successful in 1m10s
/ OCI image (pull_request) Successful in 2m48s
/ Lint (pull_request) Successful in 2m30s
/ Tests (pull_request) Successful in 1m10s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-03-05 17:45:40 +03:00
18 changed files with 287 additions and 61 deletions

View file

@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -471,6 +472,27 @@ func TestDeleteBucketByNotOwner(t *testing.T) {
deleteBucket(t, hc, bktName, http.StatusNoContent)
}
func TestRemovalOnReplace(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "object"
bktInfo := createTestBucket(hc, bktName)
putObject(hc, bktName, objName)
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 1)
putObject(hc, bktName, objName)
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
hc.layerFeatures.SetRemoveOnReplace(true)
putObject(hc, bktName, objName)
time.Sleep(time.Second)
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
}
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
bktInfo := createTestBucket(tc, bktName)

View file

@ -183,6 +183,8 @@ func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
}
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
ctx := context.Background()
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
@ -234,7 +236,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
}
h := &handler{
log: log,
obj: layer.NewLayer(log, tp, layerCfg),
obj: layer.NewLayer(ctx, log, tp, layerCfg),
cfg: cfg,
ape: newAPEMock(),
frostfsid: newFrostfsIDMock(),
@ -250,7 +252,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
h: h,
tp: tp,
tree: treeMock,
context: middleware.SetBox(context.Background(), &middleware.Box{AccessBox: accessBox}),
context: middleware.SetBox(ctx, &middleware.Box{AccessBox: accessBox}),
config: cfg,
layerFeatures: features,

View file

@ -32,8 +32,9 @@ import (
)
type FeatureSettingsMock struct {
clientCut bool
md5Enabled bool
clientCut bool
md5Enabled bool
removeOnReplace bool
}
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
@ -72,6 +73,22 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
return ns + ".ns"
}
func (k *FeatureSettingsMock) SetRemoveOnReplace(removeOnReplace bool) {
k.removeOnReplace = removeOnReplace
}
func (k *FeatureSettingsMock) RemoveOnReplace() bool {
return k.removeOnReplace
}
func (k *FeatureSettingsMock) RemoveOnReplaceTimeout() time.Duration {
return time.Minute
}
func (k *FeatureSettingsMock) RemoveOnReplaceQueue() int {
return 1
}
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
type offsetError struct {

View file

@ -48,6 +48,9 @@ type (
FormContainerZone(ns string) string
TombstoneMembersSize() int
TombstoneLifetime() uint64
RemoveOnReplace() bool
RemoveOnReplaceTimeout() time.Duration
RemoveOnReplaceQueue() int
}
Layer struct {
@ -63,6 +66,15 @@ type (
corsCnrInfo *data.BucketInfo
lifecycleCnrInfo *data.BucketInfo
workerPool *ants.Pool
removalChan chan removalParams
}
removalParams struct {
Auth frostfs.PrmAuth
BktInfo *data.BucketInfo
OIDs []oid.ID
RequestID string
TraceID string
}
Config struct {
@ -256,8 +268,8 @@ func (p HeadObjectParams) Versioned() bool {
// NewLayer creates an instance of a Layer. It checks credentials
// and establishes gRPC connection with the node.
func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
return &Layer{
func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
layer := &Layer{
frostFS: frostFS,
log: log,
gateOwner: config.GateOwner,
@ -270,7 +282,13 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
corsCnrInfo: config.CORSCnrInfo,
lifecycleCnrInfo: config.LifecycleCnrInfo,
workerPool: config.WorkerPool,
// TODO: consider closing channel
removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()),
}
go layer.removalRoutine(ctx)
return layer
}
func (n *Layer) EphemeralKey() *keys.PublicKey {
@ -695,7 +713,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
IsUnversioned: settings.VersioningSuspended(),
}
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil {
return obj
}
@ -704,6 +722,67 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
func (n *Layer) removalRoutine(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case prm, ok := <-n.removalChan:
if !ok {
return
}
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
for _, objID := range prm.OIDs {
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
}
}
cancel()
}
}
}
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
if !n.features.RemoveOnReplace() {
return
}
reqInfo := middleware.GetReqInfo(ctx)
prm := removalParams{
Auth: frostfs.PrmAuth{},
BktInfo: bktInfo,
OIDs: OIDs,
RequestID: reqInfo.RequestID,
TraceID: reqInfo.TraceID,
}
n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner)
select {
case n.removalChan <- prm:
default:
oidsStr := make([]string, len(OIDs))
for i, d := range OIDs {
oidsStr[i] = d.EncodeToString()
}
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath))
}
}
func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version)
n.tryRemove(ctx, bktInfo, OIDs)
if err != nil {
return nodeID, err
}
return nodeID, nil
}
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if isNotFoundError(obj.Error) {
obj.Error = nil

View file

@ -354,7 +354,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
newVersion.Size = createdObj.Size
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}

View file

@ -66,7 +66,7 @@ func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.Ex
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
@ -253,7 +253,7 @@ func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}

View file

@ -45,7 +45,11 @@ type Service interface {
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
// AddVersion creates new version in tree.
// Returns new node id and object ids of old versions (OIDS) that must be deleted.
// OIDs can be returned even if error is not nil.
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error)
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error

View file

@ -236,19 +236,19 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket
return nil, tree.ErrNodeNotFound
}
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) {
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
newVersion.FilePath: {newVersion},
}
return newVersion.ID, nil
return newVersion.ID, nil, nil
}
versions, ok := cnrVersionsMap[newVersion.FilePath]
if !ok {
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
return newVersion.ID, nil
return newVersion.ID, nil, nil
}
sort.Slice(versions, func(i, j int) bool {
@ -262,18 +262,22 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
result := versions
var oldUnversionedIDs []oid.ID
if newVersion.IsUnversioned {
result = make([]*data.NodeVersion, 0, len(versions))
for _, node := range versions {
if !node.IsUnversioned {
result = append(result, node)
} else {
oldUnversionedIDs = append(oldUnversionedIDs, node.OID)
}
}
}
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
return newVersion.ID, nil
return newVersion.ID, oldUnversionedIDs, nil
}
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {

View file

@ -180,7 +180,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
return &testContext{
ctx: ctx,
layer: NewLayer(logger, tp, layerCfg),
layer: NewLayer(ctx, logger, tp, layerCfg),
bktInfo: &data.BucketInfo{
Name: bktName,
Owner: owner,

View file

@ -190,7 +190,9 @@ func Request(log *zap.Logger, settings RequestSettings) Func {
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
fields = append(fields, zap.String("trace_id", traceID.String()))
traceIDStr := traceID.String()
fields = append(fields, zap.String("trace_id", traceIDStr))
reqInfo.TraceID = traceIDStr
}
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}

View file

@ -103,18 +103,19 @@ type (
}
appSettings struct {
logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig
tagsConfig *tagsConfig
maxClient maxClientsConfig
defaultMaxAge int
reconnectInterval time.Duration
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool
accessbox *cid.ID
dialerSource *internalnet.DialerSource
workerPoolSize int
logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig
tagsConfig *tagsConfig
maxClient maxClientsConfig
defaultMaxAge int
reconnectInterval time.Duration
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
frostfsidValidation bool
accessbox *cid.ID
dialerSource *internalnet.DialerSource
workerPoolSize int
removeOnReplaceQueue int
mu sync.RWMutex
namespaces Namespaces
@ -140,6 +141,8 @@ type (
tombstoneLifetime uint64
tlsTerminationHeader string
listingKeepaliveThrottle time.Duration
removeOnReplace bool
removeOnReplaceTimeout time.Duration
}
maxClientsConfig struct {
@ -313,7 +316,7 @@ func (a *App) initLayer(ctx context.Context) {
}
// prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
a.obj = layer.NewLayer(ctx, a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
}
func (a *App) initWorkerPool() *ants.Pool {
@ -326,15 +329,16 @@ func (a *App) initWorkerPool() *ants.Pool {
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings := &appSettings{
logLevel: log.lvl,
httpLogging: s3middleware.LogHTTPConfig{},
tagsConfig: newTagsConfig(v),
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v),
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
logLevel: log.lvl,
httpLogging: s3middleware.LogHTTPConfig{},
tagsConfig: newTagsConfig(v),
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v),
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
removeOnReplaceQueue: fetchRemoveOnReplaceQueue(v),
}
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
@ -376,6 +380,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
tombstoneLifetime := fetchTombstoneLifetime(v)
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
removeOnReplace := v.GetBool(cfgRemoveOnReplaceEnabled)
removeOnReplaceTimeout := fetchRemoveOnReplaceTimeout(v)
s.mu.Lock()
defer s.mu.Unlock()
@ -410,6 +416,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.tombstoneLifetime = tombstoneLifetime
s.tlsTerminationHeader = tlsTerminationHeader
s.listingKeepaliveThrottle = listingKeepaliveThrottle
s.removeOnReplace = removeOnReplace
s.removeOnReplaceTimeout = removeOnReplaceTimeout
}
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
@ -653,6 +661,24 @@ func (s *appSettings) ListingKeepaliveThrottle() time.Duration {
return s.listingKeepaliveThrottle
}
func (s *appSettings) RemoveOnReplace() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.removeOnReplace
}
func (s *appSettings) RemoveOnReplaceTimeout() time.Duration {
s.mu.RLock()
defer s.mu.RUnlock()
return s.removeOnReplaceTimeout
}
func (s *appSettings) RemoveOnReplaceQueue() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.removeOnReplaceQueue
}
func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx)
a.initHandler()

View file

@ -75,6 +75,9 @@ const (
useDefaultXmlns = "use_default_xmlns"
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
defaultRemoveOnReplaceTimeout = 30 * time.Second
defaultRemoveOnReplaceQueue = 10000
)
var (
@ -271,9 +274,12 @@ const (
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
// Enable return MD5 checksum in ETag.
cfgMD5Enabled = "features.md5.enabled"
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
cfgMD5Enabled = "features.md5.enabled"
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
cfgRemoveOnReplaceEnabled = "features.remove_on_replace.enabled"
cfgRemoveOnReplaceTimeout = "features.remove_on_replace.timeout"
cfgRemoveOnReplaceQueue = "features.remove_on_replace.queue"
// FrostfsID.
cfgFrostfsIDContract = "frostfsid.contract"
@ -955,6 +961,24 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
return tombstoneWorkerPoolSize
}
func fetchRemoveOnReplaceTimeout(v *viper.Viper) time.Duration {
val := v.GetDuration(cfgRemoveOnReplaceTimeout)
if val <= 0 {
val = defaultRemoveOnReplaceTimeout
}
return val
}
func fetchRemoveOnReplaceQueue(v *viper.Viper) int {
val := v.GetInt(cfgRemoveOnReplaceQueue)
if val <= 0 {
val = defaultRemoveOnReplaceQueue
}
return val
}
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
res := make(map[string]zapcore.Level)

View file

@ -218,6 +218,12 @@ S3_GW_FEATURES_MD5_ENABLED=false
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
# Enable removing old object during PUT operation in unversioned/suspened bucket.
S3_GW_FEATURES_REMOVE_ON_REPLACE_ENABLED=false
# Timeout to one delete operation in background.
S3_GW_FEATURES_REMOVE_ON_REPLACE_TIMEOUT=30s
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
S3_GW_FEATURES_REMOVE_ON_REPLACE_QUEUE=10000
# ReadTimeout is the maximum duration for reading the entire
# request, including the body. A zero or negative value means

View file

@ -257,6 +257,13 @@ features:
enabled: false
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
tree_pool_netmap_support: true
remove_on_replace:
# Enable removing old object during PUT operation in unversioned/suspened bucket.
enabled: false
# Timeout to one delete operation in background.
timeout: 30s
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
queue: 10000
web:
# ReadTimeout is the maximum duration for reading the entire

View file

@ -728,13 +728,20 @@ features:
md5:
enabled: false
tree_pool_netmap_support: true
remove_on_replace:
enabled: false
timeout: 30s
queue: 10000
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------------------|--------|---------------|---------------|---------------------------------------------------------------------------------------------------------|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
| `policy.deny_by_default` | `bool` | yes | false | Enable denying access for request that doesn't match any policy chain rules. |
| `tree_pool_netmap_support` | `bool` | no | false | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
| Parameter | Type | SIGHUP reload | Default value | Description |
|-----------------------------|-------------|---------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------|
| `md5.enabled` | `bool` | yes | `false` | Flag to enable return MD5 checksum in ETag headers and fields. |
| `policy.deny_by_default` | `bool` | yes | `false` | Enable denying access for request that doesn't match any policy chain rules. |
| `tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
| `remove_on_replace.enabled` | `bool` | yes | `false` | Enable removing old object during PUT operation in unversioned/suspened bucket. |
| `remove_on_replace.timeout` | `durations` | yes | `30s` | Timeout to one delete operation in background. |
| `remove_on_replace.queue` | `int` | false | `10000` | Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. |
# `web` section
Contains web server configuration parameters.

View file

@ -159,6 +159,7 @@ const (
ResolveBucket = "resolve bucket"
FailedToResolveCID = "failed to resolve CID"
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
FailedToQueueOldUnversionedObjectToDelete = "failed to queue old unversioned object to delete, removal will be performed in lifecycler"
)
// External storage.
@ -179,6 +180,7 @@ const (
PutObject = "put object"
CouldNotFetchObjectMeta = "could not fetch object meta"
FailedToDeleteObject = "failed to delete object"
FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object"
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
)
@ -201,6 +203,7 @@ const (
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
GetBucketCorsFromTree = "get bucket cors from tree"
FailedToRemoveOldUnversionedNode = "failed to remove old unversioned node"
)
// Authmate.

View file

@ -1318,10 +1318,18 @@ func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, fil
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
defer span.End()
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
res, err := c.getUnversioned(ctx, bktInfo, versionTree, filepath)
if err != nil {
return nil, err
}
return res[0], err
}
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
// getUnversioned returns all unversioned nodes for specified filepath.
// List of node is always not empty if error is nil.
// First item of list is the latest (according timestamp).
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) ([]*data.NodeVersion, error) {
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
if err != nil {
return nil, err
@ -1342,10 +1350,10 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
return nodes[i].Timestamp > nodes[j].Timestamp
})
return nodes[0], nil
return nodes, nil
}
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
defer span.End()
@ -1782,7 +1790,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
}
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) {
path := pathFromName(version.FilePath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
@ -1813,25 +1821,40 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
if version.IsUnversioned {
meta[isUnversionedKV] = "true"
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
nodes, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
if err == nil {
node := nodes[0]
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
return 0, err
return 0, nil, err
}
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
oldOIDs := make([]oid.ID, len(nodes))
for i, oldNode := range nodes {
oldOIDs[i] = oldNode.OID
}
return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)
}
if !errors.Is(err, tree.ErrNodeNotFound) {
return 0, err
return 0, nil, err
}
}
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
return nodeID, nil, err
}
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodes []*data.NodeVersion) error {
for _, node := range nodes[1:] {
if err := c.service.RemoveNode(ctx, bktInfo, treeID, node.ID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldUnversionedNode, zap.Uint64("node_id", node.ID),
zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
}
}
latest := nodes[0]
taggingNode, err := c.getTreeNode(ctx, bktInfo, latest.ID, isTagKV)
if err != nil {
return err
}

View file

@ -166,7 +166,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
IsUnversioned: true,
}
nodeID, err := treeService.AddVersion(ctx, bktInfo, version)
nodeID, _, err := treeService.AddVersion(ctx, bktInfo, version)
require.NoError(t, err)
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
@ -404,7 +404,7 @@ func TestVersionsByPrefixStreamImpl_Next(t *testing.T) {
}
for _, v := range versions {
_, err = treeService.AddVersion(ctx, bktInfo, v)
_, _, err = treeService.AddVersion(ctx, bktInfo, v)
require.NoError(t, err)
}