[#653] Support removal old unversioned objects
All checks were successful
/ DCO (pull_request) Successful in 54s
/ Vulncheck (pull_request) Successful in 1m4s
/ Builds (pull_request) Successful in 1m10s
/ OCI image (pull_request) Successful in 2m48s
/ Lint (pull_request) Successful in 2m30s
/ Tests (pull_request) Successful in 1m10s
All checks were successful
/ DCO (pull_request) Successful in 54s
/ Vulncheck (pull_request) Successful in 1m4s
/ Builds (pull_request) Successful in 1m10s
/ OCI image (pull_request) Successful in 2m48s
/ Lint (pull_request) Successful in 2m30s
/ Tests (pull_request) Successful in 1m10s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
079fd20513
commit
a215c96494
18 changed files with 287 additions and 61 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
@ -471,6 +472,27 @@ func TestDeleteBucketByNotOwner(t *testing.T) {
|
||||||
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemovalOnReplace(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket", "object"
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
putObject(hc, bktName, objName)
|
||||||
|
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 1)
|
||||||
|
|
||||||
|
putObject(hc, bktName, objName)
|
||||||
|
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||||
|
|
||||||
|
hc.layerFeatures.SetRemoveOnReplace(true)
|
||||||
|
|
||||||
|
putObject(hc, bktName, objName)
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||||
|
}
|
||||||
|
|
||||||
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
||||||
bktInfo := createTestBucket(tc, bktName)
|
bktInfo := createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
|
|
@ -183,6 +183,8 @@ func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
|
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
key, err := keys.NewPrivateKey()
|
key, err := keys.NewPrivateKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -234,7 +236,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
||||||
}
|
}
|
||||||
h := &handler{
|
h := &handler{
|
||||||
log: log,
|
log: log,
|
||||||
obj: layer.NewLayer(log, tp, layerCfg),
|
obj: layer.NewLayer(ctx, log, tp, layerCfg),
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
ape: newAPEMock(),
|
ape: newAPEMock(),
|
||||||
frostfsid: newFrostfsIDMock(),
|
frostfsid: newFrostfsIDMock(),
|
||||||
|
@ -250,7 +252,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
||||||
h: h,
|
h: h,
|
||||||
tp: tp,
|
tp: tp,
|
||||||
tree: treeMock,
|
tree: treeMock,
|
||||||
context: middleware.SetBox(context.Background(), &middleware.Box{AccessBox: accessBox}),
|
context: middleware.SetBox(ctx, &middleware.Box{AccessBox: accessBox}),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
|
|
||||||
layerFeatures: features,
|
layerFeatures: features,
|
||||||
|
|
|
@ -32,8 +32,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type FeatureSettingsMock struct {
|
type FeatureSettingsMock struct {
|
||||||
clientCut bool
|
clientCut bool
|
||||||
md5Enabled bool
|
md5Enabled bool
|
||||||
|
removeOnReplace bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
||||||
|
@ -72,6 +73,22 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
||||||
return ns + ".ns"
|
return ns + ".ns"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) SetRemoveOnReplace(removeOnReplace bool) {
|
||||||
|
k.removeOnReplace = removeOnReplace
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) RemoveOnReplace() bool {
|
||||||
|
return k.removeOnReplace
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) RemoveOnReplaceTimeout() time.Duration {
|
||||||
|
return time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) RemoveOnReplaceQueue() int {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||||
|
|
||||||
type offsetError struct {
|
type offsetError struct {
|
||||||
|
|
|
@ -48,6 +48,9 @@ type (
|
||||||
FormContainerZone(ns string) string
|
FormContainerZone(ns string) string
|
||||||
TombstoneMembersSize() int
|
TombstoneMembersSize() int
|
||||||
TombstoneLifetime() uint64
|
TombstoneLifetime() uint64
|
||||||
|
RemoveOnReplace() bool
|
||||||
|
RemoveOnReplaceTimeout() time.Duration
|
||||||
|
RemoveOnReplaceQueue() int
|
||||||
}
|
}
|
||||||
|
|
||||||
Layer struct {
|
Layer struct {
|
||||||
|
@ -63,6 +66,15 @@ type (
|
||||||
corsCnrInfo *data.BucketInfo
|
corsCnrInfo *data.BucketInfo
|
||||||
lifecycleCnrInfo *data.BucketInfo
|
lifecycleCnrInfo *data.BucketInfo
|
||||||
workerPool *ants.Pool
|
workerPool *ants.Pool
|
||||||
|
removalChan chan removalParams
|
||||||
|
}
|
||||||
|
|
||||||
|
removalParams struct {
|
||||||
|
Auth frostfs.PrmAuth
|
||||||
|
BktInfo *data.BucketInfo
|
||||||
|
OIDs []oid.ID
|
||||||
|
RequestID string
|
||||||
|
TraceID string
|
||||||
}
|
}
|
||||||
|
|
||||||
Config struct {
|
Config struct {
|
||||||
|
@ -256,8 +268,8 @@ func (p HeadObjectParams) Versioned() bool {
|
||||||
|
|
||||||
// NewLayer creates an instance of a Layer. It checks credentials
|
// NewLayer creates an instance of a Layer. It checks credentials
|
||||||
// and establishes gRPC connection with the node.
|
// and establishes gRPC connection with the node.
|
||||||
func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||||
return &Layer{
|
layer := &Layer{
|
||||||
frostFS: frostFS,
|
frostFS: frostFS,
|
||||||
log: log,
|
log: log,
|
||||||
gateOwner: config.GateOwner,
|
gateOwner: config.GateOwner,
|
||||||
|
@ -270,7 +282,13 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||||
corsCnrInfo: config.CORSCnrInfo,
|
corsCnrInfo: config.CORSCnrInfo,
|
||||||
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
||||||
workerPool: config.WorkerPool,
|
workerPool: config.WorkerPool,
|
||||||
|
// TODO: consider closing channel
|
||||||
|
removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go layer.removalRoutine(ctx)
|
||||||
|
|
||||||
|
return layer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
||||||
|
@ -695,7 +713,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
IsUnversioned: settings.VersioningSuspended(),
|
IsUnversioned: settings.VersioningSuspended(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
|
if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -704,6 +722,67 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Layer) removalRoutine(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case prm, ok := <-n.removalChan:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
|
||||||
|
for _, objID := range prm.OIDs {
|
||||||
|
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
|
||||||
|
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
|
||||||
|
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
|
||||||
|
if !n.features.RemoveOnReplace() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reqInfo := middleware.GetReqInfo(ctx)
|
||||||
|
prm := removalParams{
|
||||||
|
Auth: frostfs.PrmAuth{},
|
||||||
|
BktInfo: bktInfo,
|
||||||
|
OIDs: OIDs,
|
||||||
|
RequestID: reqInfo.RequestID,
|
||||||
|
TraceID: reqInfo.TraceID,
|
||||||
|
}
|
||||||
|
|
||||||
|
n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case n.removalChan <- prm:
|
||||||
|
default:
|
||||||
|
oidsStr := make([]string, len(OIDs))
|
||||||
|
for i, d := range OIDs {
|
||||||
|
oidsStr[i] = d.EncodeToString()
|
||||||
|
}
|
||||||
|
|
||||||
|
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
|
||||||
|
zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
||||||
|
nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version)
|
||||||
|
n.tryRemove(ctx, bktInfo, OIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nodeID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodeID, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||||
if isNotFoundError(obj.Error) {
|
if isNotFoundError(obj.Error) {
|
||||||
obj.Error = nil
|
obj.Error = nil
|
||||||
|
|
|
@ -354,7 +354,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
newVersion.Size = createdObj.Size
|
newVersion.Size = createdObj.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.Ex
|
||||||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo
|
||||||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,11 @@ type Service interface {
|
||||||
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||||
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
|
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
|
||||||
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
|
|
||||||
|
// AddVersion creates new version in tree.
|
||||||
|
// Returns new node id and object ids of old versions (OIDS) that must be deleted.
|
||||||
|
// OIDs can be returned even if error is not nil.
|
||||||
|
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error)
|
||||||
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
||||||
|
|
||||||
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
||||||
|
|
|
@ -236,19 +236,19 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket
|
||||||
return nil, tree.ErrNodeNotFound
|
return nil, tree.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) {
|
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||||
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
||||||
newVersion.FilePath: {newVersion},
|
newVersion.FilePath: {newVersion},
|
||||||
}
|
}
|
||||||
return newVersion.ID, nil
|
return newVersion.ID, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
||||||
if !ok {
|
if !ok {
|
||||||
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
||||||
return newVersion.ID, nil
|
return newVersion.ID, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(versions, func(i, j int) bool {
|
sort.Slice(versions, func(i, j int) bool {
|
||||||
|
@ -262,18 +262,22 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
||||||
|
|
||||||
result := versions
|
result := versions
|
||||||
|
|
||||||
|
var oldUnversionedIDs []oid.ID
|
||||||
|
|
||||||
if newVersion.IsUnversioned {
|
if newVersion.IsUnversioned {
|
||||||
result = make([]*data.NodeVersion, 0, len(versions))
|
result = make([]*data.NodeVersion, 0, len(versions))
|
||||||
for _, node := range versions {
|
for _, node := range versions {
|
||||||
if !node.IsUnversioned {
|
if !node.IsUnversioned {
|
||||||
result = append(result, node)
|
result = append(result, node)
|
||||||
|
} else {
|
||||||
|
oldUnversionedIDs = append(oldUnversionedIDs, node.OID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
|
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
|
||||||
|
|
||||||
return newVersion.ID, nil
|
return newVersion.ID, oldUnversionedIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
|
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
|
||||||
|
|
|
@ -180,7 +180,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
||||||
|
|
||||||
return &testContext{
|
return &testContext{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
layer: NewLayer(logger, tp, layerCfg),
|
layer: NewLayer(ctx, logger, tp, layerCfg),
|
||||||
bktInfo: &data.BucketInfo{
|
bktInfo: &data.BucketInfo{
|
||||||
Name: bktName,
|
Name: bktName,
|
||||||
Owner: owner,
|
Owner: owner,
|
||||||
|
|
|
@ -190,7 +190,9 @@ func Request(log *zap.Logger, settings RequestSettings) Func {
|
||||||
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
|
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
|
||||||
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||||
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
|
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
|
||||||
fields = append(fields, zap.String("trace_id", traceID.String()))
|
traceIDStr := traceID.String()
|
||||||
|
fields = append(fields, zap.String("trace_id", traceIDStr))
|
||||||
|
reqInfo.TraceID = traceIDStr
|
||||||
}
|
}
|
||||||
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}
|
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}
|
||||||
|
|
||||||
|
|
|
@ -103,18 +103,19 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
appSettings struct {
|
appSettings struct {
|
||||||
logLevel zap.AtomicLevel
|
logLevel zap.AtomicLevel
|
||||||
httpLogging s3middleware.LogHTTPConfig
|
httpLogging s3middleware.LogHTTPConfig
|
||||||
tagsConfig *tagsConfig
|
tagsConfig *tagsConfig
|
||||||
maxClient maxClientsConfig
|
maxClient maxClientsConfig
|
||||||
defaultMaxAge int
|
defaultMaxAge int
|
||||||
reconnectInterval time.Duration
|
reconnectInterval time.Duration
|
||||||
resolveZoneList []string
|
resolveZoneList []string
|
||||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||||
frostfsidValidation bool
|
frostfsidValidation bool
|
||||||
accessbox *cid.ID
|
accessbox *cid.ID
|
||||||
dialerSource *internalnet.DialerSource
|
dialerSource *internalnet.DialerSource
|
||||||
workerPoolSize int
|
workerPoolSize int
|
||||||
|
removeOnReplaceQueue int
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
namespaces Namespaces
|
namespaces Namespaces
|
||||||
|
@ -140,6 +141,8 @@ type (
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
tlsTerminationHeader string
|
tlsTerminationHeader string
|
||||||
listingKeepaliveThrottle time.Duration
|
listingKeepaliveThrottle time.Duration
|
||||||
|
removeOnReplace bool
|
||||||
|
removeOnReplaceTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -313,7 +316,7 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare object layer
|
// prepare object layer
|
||||||
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
a.obj = layer.NewLayer(ctx, a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initWorkerPool() *ants.Pool {
|
func (a *App) initWorkerPool() *ants.Pool {
|
||||||
|
@ -326,15 +329,16 @@ func (a *App) initWorkerPool() *ants.Pool {
|
||||||
|
|
||||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings := &appSettings{
|
settings := &appSettings{
|
||||||
logLevel: log.lvl,
|
logLevel: log.lvl,
|
||||||
httpLogging: s3middleware.LogHTTPConfig{},
|
httpLogging: s3middleware.LogHTTPConfig{},
|
||||||
tagsConfig: newTagsConfig(v),
|
tagsConfig: newTagsConfig(v),
|
||||||
maxClient: newMaxClients(v),
|
maxClient: newMaxClients(v),
|
||||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||||
reconnectInterval: fetchReconnectInterval(v),
|
reconnectInterval: fetchReconnectInterval(v),
|
||||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||||
dialerSource: getDialerSource(log.logger, v),
|
dialerSource: getDialerSource(log.logger, v),
|
||||||
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||||
|
removeOnReplaceQueue: fetchRemoveOnReplaceQueue(v),
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -376,6 +380,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
tombstoneLifetime := fetchTombstoneLifetime(v)
|
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||||
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
||||||
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
||||||
|
removeOnReplace := v.GetBool(cfgRemoveOnReplaceEnabled)
|
||||||
|
removeOnReplaceTimeout := fetchRemoveOnReplaceTimeout(v)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -410,6 +416,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.tombstoneLifetime = tombstoneLifetime
|
s.tombstoneLifetime = tombstoneLifetime
|
||||||
s.tlsTerminationHeader = tlsTerminationHeader
|
s.tlsTerminationHeader = tlsTerminationHeader
|
||||||
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
||||||
|
s.removeOnReplace = removeOnReplace
|
||||||
|
s.removeOnReplaceTimeout = removeOnReplaceTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||||
|
@ -653,6 +661,24 @@ func (s *appSettings) ListingKeepaliveThrottle() time.Duration {
|
||||||
return s.listingKeepaliveThrottle
|
return s.listingKeepaliveThrottle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) RemoveOnReplace() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.removeOnReplace
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) RemoveOnReplaceTimeout() time.Duration {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.removeOnReplaceTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) RemoveOnReplaceQueue() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.removeOnReplaceQueue
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
|
|
@ -75,6 +75,9 @@ const (
|
||||||
|
|
||||||
useDefaultXmlns = "use_default_xmlns"
|
useDefaultXmlns = "use_default_xmlns"
|
||||||
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
||||||
|
|
||||||
|
defaultRemoveOnReplaceTimeout = 30 * time.Second
|
||||||
|
defaultRemoveOnReplaceQueue = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -271,9 +274,12 @@ const (
|
||||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||||
|
|
||||||
// Enable return MD5 checksum in ETag.
|
// Enable return MD5 checksum in ETag.
|
||||||
cfgMD5Enabled = "features.md5.enabled"
|
cfgMD5Enabled = "features.md5.enabled"
|
||||||
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
||||||
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
||||||
|
cfgRemoveOnReplaceEnabled = "features.remove_on_replace.enabled"
|
||||||
|
cfgRemoveOnReplaceTimeout = "features.remove_on_replace.timeout"
|
||||||
|
cfgRemoveOnReplaceQueue = "features.remove_on_replace.queue"
|
||||||
|
|
||||||
// FrostfsID.
|
// FrostfsID.
|
||||||
cfgFrostfsIDContract = "frostfsid.contract"
|
cfgFrostfsIDContract = "frostfsid.contract"
|
||||||
|
@ -955,6 +961,24 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
||||||
return tombstoneWorkerPoolSize
|
return tombstoneWorkerPoolSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchRemoveOnReplaceTimeout(v *viper.Viper) time.Duration {
|
||||||
|
val := v.GetDuration(cfgRemoveOnReplaceTimeout)
|
||||||
|
if val <= 0 {
|
||||||
|
val = defaultRemoveOnReplaceTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchRemoveOnReplaceQueue(v *viper.Viper) int {
|
||||||
|
val := v.GetInt(cfgRemoveOnReplaceQueue)
|
||||||
|
if val <= 0 {
|
||||||
|
val = defaultRemoveOnReplaceQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
||||||
res := make(map[string]zapcore.Level)
|
res := make(map[string]zapcore.Level)
|
||||||
|
|
||||||
|
|
|
@ -218,6 +218,12 @@ S3_GW_FEATURES_MD5_ENABLED=false
|
||||||
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
|
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
|
||||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||||
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
||||||
|
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
||||||
|
S3_GW_FEATURES_REMOVE_ON_REPLACE_ENABLED=false
|
||||||
|
# Timeout to one delete operation in background.
|
||||||
|
S3_GW_FEATURES_REMOVE_ON_REPLACE_TIMEOUT=30s
|
||||||
|
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
||||||
|
S3_GW_FEATURES_REMOVE_ON_REPLACE_QUEUE=10000
|
||||||
|
|
||||||
# ReadTimeout is the maximum duration for reading the entire
|
# ReadTimeout is the maximum duration for reading the entire
|
||||||
# request, including the body. A zero or negative value means
|
# request, including the body. A zero or negative value means
|
||||||
|
|
|
@ -257,6 +257,13 @@ features:
|
||||||
enabled: false
|
enabled: false
|
||||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||||
tree_pool_netmap_support: true
|
tree_pool_netmap_support: true
|
||||||
|
remove_on_replace:
|
||||||
|
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
||||||
|
enabled: false
|
||||||
|
# Timeout to one delete operation in background.
|
||||||
|
timeout: 30s
|
||||||
|
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
||||||
|
queue: 10000
|
||||||
|
|
||||||
web:
|
web:
|
||||||
# ReadTimeout is the maximum duration for reading the entire
|
# ReadTimeout is the maximum duration for reading the entire
|
||||||
|
|
|
@ -728,13 +728,20 @@ features:
|
||||||
md5:
|
md5:
|
||||||
enabled: false
|
enabled: false
|
||||||
tree_pool_netmap_support: true
|
tree_pool_netmap_support: true
|
||||||
|
remove_on_replace:
|
||||||
|
enabled: false
|
||||||
|
timeout: 30s
|
||||||
|
queue: 10000
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|----------------------------|--------|---------------|---------------|---------------------------------------------------------------------------------------------------------|
|
|-----------------------------|-------------|---------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
|
| `md5.enabled` | `bool` | yes | `false` | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||||
| `policy.deny_by_default` | `bool` | yes | false | Enable denying access for request that doesn't match any policy chain rules. |
|
| `policy.deny_by_default` | `bool` | yes | `false` | Enable denying access for request that doesn't match any policy chain rules. |
|
||||||
| `tree_pool_netmap_support` | `bool` | no | false | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
| `tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
||||||
|
| `remove_on_replace.enabled` | `bool` | yes | `false` | Enable removing old object during PUT operation in unversioned/suspened bucket. |
|
||||||
|
| `remove_on_replace.timeout` | `durations` | yes | `30s` | Timeout to one delete operation in background. |
|
||||||
|
| `remove_on_replace.queue` | `int` | false | `10000` | Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. |
|
||||||
|
|
||||||
# `web` section
|
# `web` section
|
||||||
Contains web server configuration parameters.
|
Contains web server configuration parameters.
|
||||||
|
|
|
@ -159,6 +159,7 @@ const (
|
||||||
ResolveBucket = "resolve bucket"
|
ResolveBucket = "resolve bucket"
|
||||||
FailedToResolveCID = "failed to resolve CID"
|
FailedToResolveCID = "failed to resolve CID"
|
||||||
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
|
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
|
||||||
|
FailedToQueueOldUnversionedObjectToDelete = "failed to queue old unversioned object to delete, removal will be performed in lifecycler"
|
||||||
)
|
)
|
||||||
|
|
||||||
// External storage.
|
// External storage.
|
||||||
|
@ -179,6 +180,7 @@ const (
|
||||||
PutObject = "put object"
|
PutObject = "put object"
|
||||||
CouldNotFetchObjectMeta = "could not fetch object meta"
|
CouldNotFetchObjectMeta = "could not fetch object meta"
|
||||||
FailedToDeleteObject = "failed to delete object"
|
FailedToDeleteObject = "failed to delete object"
|
||||||
|
FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object"
|
||||||
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -201,6 +203,7 @@ const (
|
||||||
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
||||||
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
||||||
GetBucketCorsFromTree = "get bucket cors from tree"
|
GetBucketCorsFromTree = "get bucket cors from tree"
|
||||||
|
FailedToRemoveOldUnversionedNode = "failed to remove old unversioned node"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Authmate.
|
// Authmate.
|
||||||
|
|
|
@ -1318,10 +1318,18 @@ func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, fil
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
|
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
res, err := c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res[0], err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
|
// getUnversioned returns all unversioned nodes for specified filepath.
|
||||||
|
// List of node is always not empty if error is nil.
|
||||||
|
// First item of list is the latest (according timestamp).
|
||||||
|
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) ([]*data.NodeVersion, error) {
|
||||||
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1342,10 +1350,10 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
||||||
return nodes[i].Timestamp > nodes[j].Timestamp
|
return nodes[i].Timestamp > nodes[j].Timestamp
|
||||||
})
|
})
|
||||||
|
|
||||||
return nodes[0], nil
|
return nodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -1782,7 +1790,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
|
||||||
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
|
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||||
path := pathFromName(version.FilePath)
|
path := pathFromName(version.FilePath)
|
||||||
meta := map[string]string{
|
meta := map[string]string{
|
||||||
oidKV: version.OID.EncodeToString(),
|
oidKV: version.OID.EncodeToString(),
|
||||||
|
@ -1813,25 +1821,40 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
if version.IsUnversioned {
|
if version.IsUnversioned {
|
||||||
meta[isUnversionedKV] = "true"
|
meta[isUnversionedKV] = "true"
|
||||||
|
|
||||||
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
nodes, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
node := nodes[0]
|
||||||
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
|
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
|
||||||
return 0, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
|
oldOIDs := make([]oid.ID, len(nodes))
|
||||||
|
for i, oldNode := range nodes {
|
||||||
|
oldOIDs[i] = oldNode.OID
|
||||||
|
}
|
||||||
|
|
||||||
|
return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !errors.Is(err, tree.ErrNodeNotFound) {
|
if !errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return 0, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
||||||
|
return nodeID, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodes []*data.NodeVersion) error {
|
||||||
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
|
for _, node := range nodes[1:] {
|
||||||
|
if err := c.service.RemoveNode(ctx, bktInfo, treeID, node.ID); err != nil {
|
||||||
|
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldUnversionedNode, zap.Uint64("node_id", node.ID),
|
||||||
|
zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
latest := nodes[0]
|
||||||
|
taggingNode, err := c.getTreeNode(ctx, bktInfo, latest.ID, isTagKV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
|
||||||
IsUnversioned: true,
|
IsUnversioned: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeID, err := treeService.AddVersion(ctx, bktInfo, version)
|
nodeID, _, err := treeService.AddVersion(ctx, bktInfo, version)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
|
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
|
||||||
|
@ -404,7 +404,7 @@ func TestVersionsByPrefixStreamImpl_Next(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range versions {
|
for _, v := range versions {
|
||||||
_, err = treeService.AddVersion(ctx, bktInfo, v)
|
_, _, err = treeService.AddVersion(ctx, bktInfo, v)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue