forked from TrueCloudLab/frostfs-s3-gw
Compare commits
No commits in common. "feature/remove-old-objects-in-unversioned-bucket" and "master" have entirely different histories.
feature/re
...
master
18 changed files with 61 additions and 287 deletions
|
@ -8,7 +8,6 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
@ -472,27 +471,6 @@ func TestDeleteBucketByNotOwner(t *testing.T) {
|
||||||
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemovalOnReplace(t *testing.T) {
|
|
||||||
hc := prepareHandlerContext(t)
|
|
||||||
|
|
||||||
bktName, objName := "bucket", "object"
|
|
||||||
bktInfo := createTestBucket(hc, bktName)
|
|
||||||
|
|
||||||
putObject(hc, bktName, objName)
|
|
||||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 1)
|
|
||||||
|
|
||||||
putObject(hc, bktName, objName)
|
|
||||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
|
||||||
|
|
||||||
hc.layerFeatures.SetRemoveOnReplace(true)
|
|
||||||
|
|
||||||
putObject(hc, bktName, objName)
|
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
||||||
bktInfo := createTestBucket(tc, bktName)
|
bktInfo := createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
|
|
@ -183,8 +183,6 @@ func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
|
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
key, err := keys.NewPrivateKey()
|
key, err := keys.NewPrivateKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -236,7 +234,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
||||||
}
|
}
|
||||||
h := &handler{
|
h := &handler{
|
||||||
log: log,
|
log: log,
|
||||||
obj: layer.NewLayer(ctx, log, tp, layerCfg),
|
obj: layer.NewLayer(log, tp, layerCfg),
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
ape: newAPEMock(),
|
ape: newAPEMock(),
|
||||||
frostfsid: newFrostfsIDMock(),
|
frostfsid: newFrostfsIDMock(),
|
||||||
|
@ -252,7 +250,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
||||||
h: h,
|
h: h,
|
||||||
tp: tp,
|
tp: tp,
|
||||||
tree: treeMock,
|
tree: treeMock,
|
||||||
context: middleware.SetBox(ctx, &middleware.Box{AccessBox: accessBox}),
|
context: middleware.SetBox(context.Background(), &middleware.Box{AccessBox: accessBox}),
|
||||||
config: cfg,
|
config: cfg,
|
||||||
|
|
||||||
layerFeatures: features,
|
layerFeatures: features,
|
||||||
|
|
|
@ -34,7 +34,6 @@ import (
|
||||||
type FeatureSettingsMock struct {
|
type FeatureSettingsMock struct {
|
||||||
clientCut bool
|
clientCut bool
|
||||||
md5Enabled bool
|
md5Enabled bool
|
||||||
removeOnReplace bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
||||||
|
@ -73,22 +72,6 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
||||||
return ns + ".ns"
|
return ns + ".ns"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) SetRemoveOnReplace(removeOnReplace bool) {
|
|
||||||
k.removeOnReplace = removeOnReplace
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) RemoveOnReplace() bool {
|
|
||||||
return k.removeOnReplace
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) RemoveOnReplaceTimeout() time.Duration {
|
|
||||||
return time.Minute
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) RemoveOnReplaceQueue() int {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||||
|
|
||||||
type offsetError struct {
|
type offsetError struct {
|
||||||
|
|
|
@ -48,9 +48,6 @@ type (
|
||||||
FormContainerZone(ns string) string
|
FormContainerZone(ns string) string
|
||||||
TombstoneMembersSize() int
|
TombstoneMembersSize() int
|
||||||
TombstoneLifetime() uint64
|
TombstoneLifetime() uint64
|
||||||
RemoveOnReplace() bool
|
|
||||||
RemoveOnReplaceTimeout() time.Duration
|
|
||||||
RemoveOnReplaceQueue() int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Layer struct {
|
Layer struct {
|
||||||
|
@ -66,15 +63,6 @@ type (
|
||||||
corsCnrInfo *data.BucketInfo
|
corsCnrInfo *data.BucketInfo
|
||||||
lifecycleCnrInfo *data.BucketInfo
|
lifecycleCnrInfo *data.BucketInfo
|
||||||
workerPool *ants.Pool
|
workerPool *ants.Pool
|
||||||
removalChan chan removalParams
|
|
||||||
}
|
|
||||||
|
|
||||||
removalParams struct {
|
|
||||||
Auth frostfs.PrmAuth
|
|
||||||
BktInfo *data.BucketInfo
|
|
||||||
OIDs []oid.ID
|
|
||||||
RequestID string
|
|
||||||
TraceID string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Config struct {
|
Config struct {
|
||||||
|
@ -268,8 +256,8 @@ func (p HeadObjectParams) Versioned() bool {
|
||||||
|
|
||||||
// NewLayer creates an instance of a Layer. It checks credentials
|
// NewLayer creates an instance of a Layer. It checks credentials
|
||||||
// and establishes gRPC connection with the node.
|
// and establishes gRPC connection with the node.
|
||||||
func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||||
layer := &Layer{
|
return &Layer{
|
||||||
frostFS: frostFS,
|
frostFS: frostFS,
|
||||||
log: log,
|
log: log,
|
||||||
gateOwner: config.GateOwner,
|
gateOwner: config.GateOwner,
|
||||||
|
@ -282,13 +270,7 @@ func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, con
|
||||||
corsCnrInfo: config.CORSCnrInfo,
|
corsCnrInfo: config.CORSCnrInfo,
|
||||||
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
||||||
workerPool: config.WorkerPool,
|
workerPool: config.WorkerPool,
|
||||||
// TODO: consider closing channel
|
|
||||||
removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go layer.removalRoutine(ctx)
|
|
||||||
|
|
||||||
return layer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
||||||
|
@ -713,7 +695,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
IsUnversioned: settings.VersioningSuspended(),
|
IsUnversioned: settings.VersioningSuspended(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil {
|
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -722,67 +704,6 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) removalRoutine(ctx context.Context) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case prm, ok := <-n.removalChan:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
|
|
||||||
for _, objID := range prm.OIDs {
|
|
||||||
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
|
|
||||||
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
|
|
||||||
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
|
|
||||||
if !n.features.RemoveOnReplace() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
reqInfo := middleware.GetReqInfo(ctx)
|
|
||||||
prm := removalParams{
|
|
||||||
Auth: frostfs.PrmAuth{},
|
|
||||||
BktInfo: bktInfo,
|
|
||||||
OIDs: OIDs,
|
|
||||||
RequestID: reqInfo.RequestID,
|
|
||||||
TraceID: reqInfo.TraceID,
|
|
||||||
}
|
|
||||||
|
|
||||||
n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case n.removalChan <- prm:
|
|
||||||
default:
|
|
||||||
oidsStr := make([]string, len(OIDs))
|
|
||||||
for i, d := range OIDs {
|
|
||||||
oidsStr[i] = d.EncodeToString()
|
|
||||||
}
|
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
|
|
||||||
zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
|
||||||
nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version)
|
|
||||||
n.tryRemove(ctx, bktInfo, OIDs)
|
|
||||||
if err != nil {
|
|
||||||
return nodeID, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodeID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||||
if isNotFoundError(obj.Error) {
|
if isNotFoundError(obj.Error) {
|
||||||
obj.Error = nil
|
obj.Error = nil
|
||||||
|
|
|
@ -354,7 +354,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
newVersion.Size = createdObj.Size
|
newVersion.Size = createdObj.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.Ex
|
||||||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo
|
||||||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,11 +45,7 @@ type Service interface {
|
||||||
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||||
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
|
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
|
||||||
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||||
|
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
|
||||||
// AddVersion creates new version in tree.
|
|
||||||
// Returns new node id and object ids of old versions (OIDS) that must be deleted.
|
|
||||||
// OIDs can be returned even if error is not nil.
|
|
||||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error)
|
|
||||||
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
||||||
|
|
||||||
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
||||||
|
|
|
@ -236,19 +236,19 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket
|
||||||
return nil, tree.ErrNodeNotFound
|
return nil, tree.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) {
|
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) {
|
||||||
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
||||||
newVersion.FilePath: {newVersion},
|
newVersion.FilePath: {newVersion},
|
||||||
}
|
}
|
||||||
return newVersion.ID, nil, nil
|
return newVersion.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
||||||
if !ok {
|
if !ok {
|
||||||
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
||||||
return newVersion.ID, nil, nil
|
return newVersion.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(versions, func(i, j int) bool {
|
sort.Slice(versions, func(i, j int) bool {
|
||||||
|
@ -262,22 +262,18 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
||||||
|
|
||||||
result := versions
|
result := versions
|
||||||
|
|
||||||
var oldUnversionedIDs []oid.ID
|
|
||||||
|
|
||||||
if newVersion.IsUnversioned {
|
if newVersion.IsUnversioned {
|
||||||
result = make([]*data.NodeVersion, 0, len(versions))
|
result = make([]*data.NodeVersion, 0, len(versions))
|
||||||
for _, node := range versions {
|
for _, node := range versions {
|
||||||
if !node.IsUnversioned {
|
if !node.IsUnversioned {
|
||||||
result = append(result, node)
|
result = append(result, node)
|
||||||
} else {
|
|
||||||
oldUnversionedIDs = append(oldUnversionedIDs, node.OID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
|
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
|
||||||
|
|
||||||
return newVersion.ID, oldUnversionedIDs, nil
|
return newVersion.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
|
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
|
||||||
|
|
|
@ -180,7 +180,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
||||||
|
|
||||||
return &testContext{
|
return &testContext{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
layer: NewLayer(ctx, logger, tp, layerCfg),
|
layer: NewLayer(logger, tp, layerCfg),
|
||||||
bktInfo: &data.BucketInfo{
|
bktInfo: &data.BucketInfo{
|
||||||
Name: bktName,
|
Name: bktName,
|
||||||
Owner: owner,
|
Owner: owner,
|
||||||
|
|
|
@ -190,9 +190,7 @@ func Request(log *zap.Logger, settings RequestSettings) Func {
|
||||||
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
|
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
|
||||||
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||||
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
|
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
|
||||||
traceIDStr := traceID.String()
|
fields = append(fields, zap.String("trace_id", traceID.String()))
|
||||||
fields = append(fields, zap.String("trace_id", traceIDStr))
|
|
||||||
reqInfo.TraceID = traceIDStr
|
|
||||||
}
|
}
|
||||||
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}
|
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,6 @@ type (
|
||||||
accessbox *cid.ID
|
accessbox *cid.ID
|
||||||
dialerSource *internalnet.DialerSource
|
dialerSource *internalnet.DialerSource
|
||||||
workerPoolSize int
|
workerPoolSize int
|
||||||
removeOnReplaceQueue int
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
namespaces Namespaces
|
namespaces Namespaces
|
||||||
|
@ -141,8 +140,6 @@ type (
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
tlsTerminationHeader string
|
tlsTerminationHeader string
|
||||||
listingKeepaliveThrottle time.Duration
|
listingKeepaliveThrottle time.Duration
|
||||||
removeOnReplace bool
|
|
||||||
removeOnReplaceTimeout time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -316,7 +313,7 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare object layer
|
// prepare object layer
|
||||||
a.obj = layer.NewLayer(ctx, a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initWorkerPool() *ants.Pool {
|
func (a *App) initWorkerPool() *ants.Pool {
|
||||||
|
@ -338,7 +335,6 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||||
dialerSource: getDialerSource(log.logger, v),
|
dialerSource: getDialerSource(log.logger, v),
|
||||||
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||||
removeOnReplaceQueue: fetchRemoveOnReplaceQueue(v),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -380,8 +376,6 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
tombstoneLifetime := fetchTombstoneLifetime(v)
|
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||||
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
||||||
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
||||||
removeOnReplace := v.GetBool(cfgRemoveOnReplaceEnabled)
|
|
||||||
removeOnReplaceTimeout := fetchRemoveOnReplaceTimeout(v)
|
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -416,8 +410,6 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.tombstoneLifetime = tombstoneLifetime
|
s.tombstoneLifetime = tombstoneLifetime
|
||||||
s.tlsTerminationHeader = tlsTerminationHeader
|
s.tlsTerminationHeader = tlsTerminationHeader
|
||||||
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
||||||
s.removeOnReplace = removeOnReplace
|
|
||||||
s.removeOnReplaceTimeout = removeOnReplaceTimeout
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||||
|
@ -661,24 +653,6 @@ func (s *appSettings) ListingKeepaliveThrottle() time.Duration {
|
||||||
return s.listingKeepaliveThrottle
|
return s.listingKeepaliveThrottle
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) RemoveOnReplace() bool {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
return s.removeOnReplace
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *appSettings) RemoveOnReplaceTimeout() time.Duration {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
return s.removeOnReplaceTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *appSettings) RemoveOnReplaceQueue() int {
|
|
||||||
s.mu.RLock()
|
|
||||||
defer s.mu.RUnlock()
|
|
||||||
return s.removeOnReplaceQueue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
|
|
@ -75,9 +75,6 @@ const (
|
||||||
|
|
||||||
useDefaultXmlns = "use_default_xmlns"
|
useDefaultXmlns = "use_default_xmlns"
|
||||||
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
||||||
|
|
||||||
defaultRemoveOnReplaceTimeout = 30 * time.Second
|
|
||||||
defaultRemoveOnReplaceQueue = 10000
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -277,9 +274,6 @@ const (
|
||||||
cfgMD5Enabled = "features.md5.enabled"
|
cfgMD5Enabled = "features.md5.enabled"
|
||||||
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
||||||
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
||||||
cfgRemoveOnReplaceEnabled = "features.remove_on_replace.enabled"
|
|
||||||
cfgRemoveOnReplaceTimeout = "features.remove_on_replace.timeout"
|
|
||||||
cfgRemoveOnReplaceQueue = "features.remove_on_replace.queue"
|
|
||||||
|
|
||||||
// FrostfsID.
|
// FrostfsID.
|
||||||
cfgFrostfsIDContract = "frostfsid.contract"
|
cfgFrostfsIDContract = "frostfsid.contract"
|
||||||
|
@ -961,24 +955,6 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
||||||
return tombstoneWorkerPoolSize
|
return tombstoneWorkerPoolSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchRemoveOnReplaceTimeout(v *viper.Viper) time.Duration {
|
|
||||||
val := v.GetDuration(cfgRemoveOnReplaceTimeout)
|
|
||||||
if val <= 0 {
|
|
||||||
val = defaultRemoveOnReplaceTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
return val
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchRemoveOnReplaceQueue(v *viper.Viper) int {
|
|
||||||
val := v.GetInt(cfgRemoveOnReplaceQueue)
|
|
||||||
if val <= 0 {
|
|
||||||
val = defaultRemoveOnReplaceQueue
|
|
||||||
}
|
|
||||||
|
|
||||||
return val
|
|
||||||
}
|
|
||||||
|
|
||||||
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
||||||
res := make(map[string]zapcore.Level)
|
res := make(map[string]zapcore.Level)
|
||||||
|
|
||||||
|
|
|
@ -218,12 +218,6 @@ S3_GW_FEATURES_MD5_ENABLED=false
|
||||||
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
|
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
|
||||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||||
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
||||||
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
|
||||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_ENABLED=false
|
|
||||||
# Timeout to one delete operation in background.
|
|
||||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_TIMEOUT=30s
|
|
||||||
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
|
||||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_QUEUE=10000
|
|
||||||
|
|
||||||
# ReadTimeout is the maximum duration for reading the entire
|
# ReadTimeout is the maximum duration for reading the entire
|
||||||
# request, including the body. A zero or negative value means
|
# request, including the body. A zero or negative value means
|
||||||
|
|
|
@ -257,13 +257,6 @@ features:
|
||||||
enabled: false
|
enabled: false
|
||||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||||
tree_pool_netmap_support: true
|
tree_pool_netmap_support: true
|
||||||
remove_on_replace:
|
|
||||||
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
|
||||||
enabled: false
|
|
||||||
# Timeout to one delete operation in background.
|
|
||||||
timeout: 30s
|
|
||||||
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
|
||||||
queue: 10000
|
|
||||||
|
|
||||||
web:
|
web:
|
||||||
# ReadTimeout is the maximum duration for reading the entire
|
# ReadTimeout is the maximum duration for reading the entire
|
||||||
|
|
|
@ -728,20 +728,13 @@ features:
|
||||||
md5:
|
md5:
|
||||||
enabled: false
|
enabled: false
|
||||||
tree_pool_netmap_support: true
|
tree_pool_netmap_support: true
|
||||||
remove_on_replace:
|
|
||||||
enabled: false
|
|
||||||
timeout: 30s
|
|
||||||
queue: 10000
|
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|-----------------------------|-------------|---------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------|
|
|----------------------------|--------|---------------|---------------|---------------------------------------------------------------------------------------------------------|
|
||||||
| `md5.enabled` | `bool` | yes | `false` | Flag to enable return MD5 checksum in ETag headers and fields. |
|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||||
| `policy.deny_by_default` | `bool` | yes | `false` | Enable denying access for request that doesn't match any policy chain rules. |
|
| `policy.deny_by_default` | `bool` | yes | false | Enable denying access for request that doesn't match any policy chain rules. |
|
||||||
| `tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
| `tree_pool_netmap_support` | `bool` | no | false | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
||||||
| `remove_on_replace.enabled` | `bool` | yes | `false` | Enable removing old object during PUT operation in unversioned/suspened bucket. |
|
|
||||||
| `remove_on_replace.timeout` | `durations` | yes | `30s` | Timeout to one delete operation in background. |
|
|
||||||
| `remove_on_replace.queue` | `int` | false | `10000` | Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. |
|
|
||||||
|
|
||||||
# `web` section
|
# `web` section
|
||||||
Contains web server configuration parameters.
|
Contains web server configuration parameters.
|
||||||
|
|
|
@ -159,7 +159,6 @@ const (
|
||||||
ResolveBucket = "resolve bucket"
|
ResolveBucket = "resolve bucket"
|
||||||
FailedToResolveCID = "failed to resolve CID"
|
FailedToResolveCID = "failed to resolve CID"
|
||||||
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
|
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
|
||||||
FailedToQueueOldUnversionedObjectToDelete = "failed to queue old unversioned object to delete, removal will be performed in lifecycler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// External storage.
|
// External storage.
|
||||||
|
@ -180,7 +179,6 @@ const (
|
||||||
PutObject = "put object"
|
PutObject = "put object"
|
||||||
CouldNotFetchObjectMeta = "could not fetch object meta"
|
CouldNotFetchObjectMeta = "could not fetch object meta"
|
||||||
FailedToDeleteObject = "failed to delete object"
|
FailedToDeleteObject = "failed to delete object"
|
||||||
FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object"
|
|
||||||
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -203,7 +201,6 @@ const (
|
||||||
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
||||||
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
||||||
GetBucketCorsFromTree = "get bucket cors from tree"
|
GetBucketCorsFromTree = "get bucket cors from tree"
|
||||||
FailedToRemoveOldUnversionedNode = "failed to remove old unversioned node"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Authmate.
|
// Authmate.
|
||||||
|
|
|
@ -1318,18 +1318,10 @@ func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, fil
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
|
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
res, err := c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return res[0], err
|
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
|
||||||
}
|
|
||||||
|
|
||||||
// getUnversioned returns all unversioned nodes for specified filepath.
|
|
||||||
// List of node is always not empty if error is nil.
|
|
||||||
// First item of list is the latest (according timestamp).
|
|
||||||
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) ([]*data.NodeVersion, error) {
|
|
||||||
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1350,10 +1342,10 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
||||||
return nodes[i].Timestamp > nodes[j].Timestamp
|
return nodes[i].Timestamp > nodes[j].Timestamp
|
||||||
})
|
})
|
||||||
|
|
||||||
return nodes, nil
|
return nodes[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -1790,7 +1782,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
|
||||||
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
|
||||||
path := pathFromName(version.FilePath)
|
path := pathFromName(version.FilePath)
|
||||||
meta := map[string]string{
|
meta := map[string]string{
|
||||||
oidKV: version.OID.EncodeToString(),
|
oidKV: version.OID.EncodeToString(),
|
||||||
|
@ -1821,40 +1813,25 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
if version.IsUnversioned {
|
if version.IsUnversioned {
|
||||||
meta[isUnversionedKV] = "true"
|
meta[isUnversionedKV] = "true"
|
||||||
|
|
||||||
nodes, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
node := nodes[0]
|
|
||||||
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
|
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
|
||||||
return 0, nil, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
oldOIDs := make([]oid.ID, len(nodes))
|
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
|
||||||
for i, oldNode := range nodes {
|
|
||||||
oldOIDs[i] = oldNode.OID
|
|
||||||
}
|
|
||||||
|
|
||||||
return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !errors.Is(err, tree.ErrNodeNotFound) {
|
if !errors.Is(err, tree.ErrNodeNotFound) {
|
||||||
return 0, nil, err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
||||||
return nodeID, nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodes []*data.NodeVersion) error {
|
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||||
for _, node := range nodes[1:] {
|
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
|
||||||
if err := c.service.RemoveNode(ctx, bktInfo, treeID, node.ID); err != nil {
|
|
||||||
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldUnversionedNode, zap.Uint64("node_id", node.ID),
|
|
||||||
zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
latest := nodes[0]
|
|
||||||
taggingNode, err := c.getTreeNode(ctx, bktInfo, latest.ID, isTagKV)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,7 +166,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
|
||||||
IsUnversioned: true,
|
IsUnversioned: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeID, _, err := treeService.AddVersion(ctx, bktInfo, version)
|
nodeID, err := treeService.AddVersion(ctx, bktInfo, version)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
|
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
|
||||||
|
@ -404,7 +404,7 @@ func TestVersionsByPrefixStreamImpl_Next(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range versions {
|
for _, v := range versions {
|
||||||
_, _, err = treeService.AddVersion(ctx, bktInfo, v)
|
_, err = treeService.AddVersion(ctx, bktInfo, v)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue