forked from TrueCloudLab/frostfs-s3-gw
Compare commits
No commits in common. "feature/remove-old-objects-in-unversioned-bucket" and "master" have entirely different histories.
feature/re
...
master
18 changed files with 61 additions and 287 deletions
|
@ -8,7 +8,6 @@ import (
|
|||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
|
@ -472,27 +471,6 @@ func TestDeleteBucketByNotOwner(t *testing.T) {
|
|||
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func TestRemovalOnReplace(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket", "object"
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 1)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||
|
||||
hc.layerFeatures.SetRemoveOnReplace(true)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||
}
|
||||
|
||||
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
||||
bktInfo := createTestBucket(tc, bktName)
|
||||
|
||||
|
|
|
@ -183,8 +183,6 @@ func prepareHandlerContextWithMinCache(t *testing.T) *handlerContext {
|
|||
}
|
||||
|
||||
func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*handlerContextBase, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -236,7 +234,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
|||
}
|
||||
h := &handler{
|
||||
log: log,
|
||||
obj: layer.NewLayer(ctx, log, tp, layerCfg),
|
||||
obj: layer.NewLayer(log, tp, layerCfg),
|
||||
cfg: cfg,
|
||||
ape: newAPEMock(),
|
||||
frostfsid: newFrostfsIDMock(),
|
||||
|
@ -252,7 +250,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig, log *zap.Logger) (*
|
|||
h: h,
|
||||
tp: tp,
|
||||
tree: treeMock,
|
||||
context: middleware.SetBox(ctx, &middleware.Box{AccessBox: accessBox}),
|
||||
context: middleware.SetBox(context.Background(), &middleware.Box{AccessBox: accessBox}),
|
||||
config: cfg,
|
||||
|
||||
layerFeatures: features,
|
||||
|
|
|
@ -32,9 +32,8 @@ import (
|
|||
)
|
||||
|
||||
type FeatureSettingsMock struct {
|
||||
clientCut bool
|
||||
md5Enabled bool
|
||||
removeOnReplace bool
|
||||
clientCut bool
|
||||
md5Enabled bool
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
||||
|
@ -73,22 +72,6 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
|||
return ns + ".ns"
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) SetRemoveOnReplace(removeOnReplace bool) {
|
||||
k.removeOnReplace = removeOnReplace
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) RemoveOnReplace() bool {
|
||||
return k.removeOnReplace
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) RemoveOnReplaceTimeout() time.Duration {
|
||||
return time.Minute
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) RemoveOnReplaceQueue() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||
|
||||
type offsetError struct {
|
||||
|
|
|
@ -48,9 +48,6 @@ type (
|
|||
FormContainerZone(ns string) string
|
||||
TombstoneMembersSize() int
|
||||
TombstoneLifetime() uint64
|
||||
RemoveOnReplace() bool
|
||||
RemoveOnReplaceTimeout() time.Duration
|
||||
RemoveOnReplaceQueue() int
|
||||
}
|
||||
|
||||
Layer struct {
|
||||
|
@ -66,15 +63,6 @@ type (
|
|||
corsCnrInfo *data.BucketInfo
|
||||
lifecycleCnrInfo *data.BucketInfo
|
||||
workerPool *ants.Pool
|
||||
removalChan chan removalParams
|
||||
}
|
||||
|
||||
removalParams struct {
|
||||
Auth frostfs.PrmAuth
|
||||
BktInfo *data.BucketInfo
|
||||
OIDs []oid.ID
|
||||
RequestID string
|
||||
TraceID string
|
||||
}
|
||||
|
||||
Config struct {
|
||||
|
@ -268,8 +256,8 @@ func (p HeadObjectParams) Versioned() bool {
|
|||
|
||||
// NewLayer creates an instance of a Layer. It checks credentials
|
||||
// and establishes gRPC connection with the node.
|
||||
func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||
layer := &Layer{
|
||||
func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||
return &Layer{
|
||||
frostFS: frostFS,
|
||||
log: log,
|
||||
gateOwner: config.GateOwner,
|
||||
|
@ -282,13 +270,7 @@ func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, con
|
|||
corsCnrInfo: config.CORSCnrInfo,
|
||||
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
||||
workerPool: config.WorkerPool,
|
||||
// TODO: consider closing channel
|
||||
removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()),
|
||||
}
|
||||
|
||||
go layer.removalRoutine(ctx)
|
||||
|
||||
return layer
|
||||
}
|
||||
|
||||
func (n *Layer) EphemeralKey() *keys.PublicKey {
|
||||
|
@ -713,7 +695,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
IsUnversioned: settings.VersioningSuspended(),
|
||||
}
|
||||
|
||||
if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil {
|
||||
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
|
||||
return obj
|
||||
}
|
||||
|
||||
|
@ -722,67 +704,6 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
return obj
|
||||
}
|
||||
|
||||
func (n *Layer) removalRoutine(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case prm, ok := <-n.removalChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
|
||||
for _, objID := range prm.OIDs {
|
||||
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
|
||||
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
|
||||
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
|
||||
if !n.features.RemoveOnReplace() {
|
||||
return
|
||||
}
|
||||
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
prm := removalParams{
|
||||
Auth: frostfs.PrmAuth{},
|
||||
BktInfo: bktInfo,
|
||||
OIDs: OIDs,
|
||||
RequestID: reqInfo.RequestID,
|
||||
TraceID: reqInfo.TraceID,
|
||||
}
|
||||
|
||||
n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner)
|
||||
|
||||
select {
|
||||
case n.removalChan <- prm:
|
||||
default:
|
||||
oidsStr := make([]string, len(OIDs))
|
||||
for i, d := range OIDs {
|
||||
oidsStr[i] = d.EncodeToString()
|
||||
}
|
||||
|
||||
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
|
||||
zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
||||
nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version)
|
||||
n.tryRemove(ctx, bktInfo, OIDs)
|
||||
if err != nil {
|
||||
return nodeID, err
|
||||
}
|
||||
|
||||
return nodeID, nil
|
||||
}
|
||||
|
||||
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
if isNotFoundError(obj.Error) {
|
||||
obj.Error = nil
|
||||
|
|
|
@ -354,7 +354,7 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
newVersion.Size = createdObj.Size
|
||||
}
|
||||
|
||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.Ex
|
|||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||
}
|
||||
|
||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,7 @@ func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo
|
|||
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
|
||||
}
|
||||
|
||||
if newVersion.ID, err = n.addVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -45,11 +45,7 @@ type Service interface {
|
|||
GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
|
||||
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
|
||||
|
||||
// AddVersion creates new version in tree.
|
||||
// Returns new node id and object ids of old versions (OIDS) that must be deleted.
|
||||
// OIDs can be returned even if error is not nil.
|
||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error)
|
||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
|
||||
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
||||
|
||||
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
||||
|
|
|
@ -236,19 +236,19 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket
|
|||
return nil, tree.ErrNodeNotFound
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) {
|
||||
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
||||
newVersion.FilePath: {newVersion},
|
||||
}
|
||||
return newVersion.ID, nil, nil
|
||||
return newVersion.ID, nil
|
||||
}
|
||||
|
||||
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
||||
if !ok {
|
||||
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
||||
return newVersion.ID, nil, nil
|
||||
return newVersion.ID, nil
|
||||
}
|
||||
|
||||
sort.Slice(versions, func(i, j int) bool {
|
||||
|
@ -262,22 +262,18 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
|||
|
||||
result := versions
|
||||
|
||||
var oldUnversionedIDs []oid.ID
|
||||
|
||||
if newVersion.IsUnversioned {
|
||||
result = make([]*data.NodeVersion, 0, len(versions))
|
||||
for _, node := range versions {
|
||||
if !node.IsUnversioned {
|
||||
result = append(result, node)
|
||||
} else {
|
||||
oldUnversionedIDs = append(oldUnversionedIDs, node.OID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
|
||||
|
||||
return newVersion.ID, oldUnversionedIDs, nil
|
||||
return newVersion.ID, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
|
||||
|
|
|
@ -180,7 +180,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
|
||||
return &testContext{
|
||||
ctx: ctx,
|
||||
layer: NewLayer(ctx, logger, tp, layerCfg),
|
||||
layer: NewLayer(logger, tp, layerCfg),
|
||||
bktInfo: &data.BucketInfo{
|
||||
Name: bktName,
|
||||
Owner: owner,
|
||||
|
|
|
@ -190,9 +190,7 @@ func Request(log *zap.Logger, settings RequestSettings) Func {
|
|||
fields := []zap.Field{zap.String("request_id", reqInfo.RequestID)}
|
||||
ctx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||
if traceID := span.SpanContext().TraceID(); traceID.IsValid() {
|
||||
traceIDStr := traceID.String()
|
||||
fields = append(fields, zap.String("trace_id", traceIDStr))
|
||||
reqInfo.TraceID = traceIDStr
|
||||
fields = append(fields, zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
lw := &traceResponseWriter{ResponseWriter: w, ctx: ctx, span: span}
|
||||
|
||||
|
|
|
@ -103,19 +103,18 @@ type (
|
|||
}
|
||||
|
||||
appSettings struct {
|
||||
logLevel zap.AtomicLevel
|
||||
httpLogging s3middleware.LogHTTPConfig
|
||||
tagsConfig *tagsConfig
|
||||
maxClient maxClientsConfig
|
||||
defaultMaxAge int
|
||||
reconnectInterval time.Duration
|
||||
resolveZoneList []string
|
||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||
frostfsidValidation bool
|
||||
accessbox *cid.ID
|
||||
dialerSource *internalnet.DialerSource
|
||||
workerPoolSize int
|
||||
removeOnReplaceQueue int
|
||||
logLevel zap.AtomicLevel
|
||||
httpLogging s3middleware.LogHTTPConfig
|
||||
tagsConfig *tagsConfig
|
||||
maxClient maxClientsConfig
|
||||
defaultMaxAge int
|
||||
reconnectInterval time.Duration
|
||||
resolveZoneList []string
|
||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||
frostfsidValidation bool
|
||||
accessbox *cid.ID
|
||||
dialerSource *internalnet.DialerSource
|
||||
workerPoolSize int
|
||||
|
||||
mu sync.RWMutex
|
||||
namespaces Namespaces
|
||||
|
@ -141,8 +140,6 @@ type (
|
|||
tombstoneLifetime uint64
|
||||
tlsTerminationHeader string
|
||||
listingKeepaliveThrottle time.Duration
|
||||
removeOnReplace bool
|
||||
removeOnReplaceTimeout time.Duration
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -316,7 +313,7 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
}
|
||||
|
||||
// prepare object layer
|
||||
a.obj = layer.NewLayer(ctx, a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||
}
|
||||
|
||||
func (a *App) initWorkerPool() *ants.Pool {
|
||||
|
@ -329,16 +326,15 @@ func (a *App) initWorkerPool() *ants.Pool {
|
|||
|
||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||
settings := &appSettings{
|
||||
logLevel: log.lvl,
|
||||
httpLogging: s3middleware.LogHTTPConfig{},
|
||||
tagsConfig: newTagsConfig(v),
|
||||
maxClient: newMaxClients(v),
|
||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||
reconnectInterval: fetchReconnectInterval(v),
|
||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||
dialerSource: getDialerSource(log.logger, v),
|
||||
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||
removeOnReplaceQueue: fetchRemoveOnReplaceQueue(v),
|
||||
logLevel: log.lvl,
|
||||
httpLogging: s3middleware.LogHTTPConfig{},
|
||||
tagsConfig: newTagsConfig(v),
|
||||
maxClient: newMaxClients(v),
|
||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||
reconnectInterval: fetchReconnectInterval(v),
|
||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||
dialerSource: getDialerSource(log.logger, v),
|
||||
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||
}
|
||||
|
||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||
|
@ -380,8 +376,6 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
||||
listingKeepaliveThrottle := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
||||
removeOnReplace := v.GetBool(cfgRemoveOnReplaceEnabled)
|
||||
removeOnReplaceTimeout := fetchRemoveOnReplaceTimeout(v)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -416,8 +410,6 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
s.tombstoneLifetime = tombstoneLifetime
|
||||
s.tlsTerminationHeader = tlsTerminationHeader
|
||||
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
||||
s.removeOnReplace = removeOnReplace
|
||||
s.removeOnReplaceTimeout = removeOnReplaceTimeout
|
||||
}
|
||||
|
||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||
|
@ -661,24 +653,6 @@ func (s *appSettings) ListingKeepaliveThrottle() time.Duration {
|
|||
return s.listingKeepaliveThrottle
|
||||
}
|
||||
|
||||
func (s *appSettings) RemoveOnReplace() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.removeOnReplace
|
||||
}
|
||||
|
||||
func (s *appSettings) RemoveOnReplaceTimeout() time.Duration {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.removeOnReplaceTimeout
|
||||
}
|
||||
|
||||
func (s *appSettings) RemoveOnReplaceQueue() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.removeOnReplaceQueue
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
|
|
|
@ -75,9 +75,6 @@ const (
|
|||
|
||||
useDefaultXmlns = "use_default_xmlns"
|
||||
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
||||
|
||||
defaultRemoveOnReplaceTimeout = 30 * time.Second
|
||||
defaultRemoveOnReplaceQueue = 10000
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -274,12 +271,9 @@ const (
|
|||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||
|
||||
// Enable return MD5 checksum in ETag.
|
||||
cfgMD5Enabled = "features.md5.enabled"
|
||||
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
||||
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
||||
cfgRemoveOnReplaceEnabled = "features.remove_on_replace.enabled"
|
||||
cfgRemoveOnReplaceTimeout = "features.remove_on_replace.timeout"
|
||||
cfgRemoveOnReplaceQueue = "features.remove_on_replace.queue"
|
||||
cfgMD5Enabled = "features.md5.enabled"
|
||||
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
||||
cfgTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
||||
|
||||
// FrostfsID.
|
||||
cfgFrostfsIDContract = "frostfsid.contract"
|
||||
|
@ -961,24 +955,6 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
|||
return tombstoneWorkerPoolSize
|
||||
}
|
||||
|
||||
func fetchRemoveOnReplaceTimeout(v *viper.Viper) time.Duration {
|
||||
val := v.GetDuration(cfgRemoveOnReplaceTimeout)
|
||||
if val <= 0 {
|
||||
val = defaultRemoveOnReplaceTimeout
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchRemoveOnReplaceQueue(v *viper.Viper) int {
|
||||
val := v.GetInt(cfgRemoveOnReplaceQueue)
|
||||
if val <= 0 {
|
||||
val = defaultRemoveOnReplaceQueue
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
||||
res := make(map[string]zapcore.Level)
|
||||
|
||||
|
|
|
@ -218,12 +218,6 @@ S3_GW_FEATURES_MD5_ENABLED=false
|
|||
S3_GW_FEATURES_POLICY_DENY_BY_DEFAULT=false
|
||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||
S3_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
||||
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_ENABLED=false
|
||||
# Timeout to one delete operation in background.
|
||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_TIMEOUT=30s
|
||||
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
||||
S3_GW_FEATURES_REMOVE_ON_REPLACE_QUEUE=10000
|
||||
|
||||
# ReadTimeout is the maximum duration for reading the entire
|
||||
# request, including the body. A zero or negative value means
|
||||
|
|
|
@ -257,13 +257,6 @@ features:
|
|||
enabled: false
|
||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||
tree_pool_netmap_support: true
|
||||
remove_on_replace:
|
||||
# Enable removing old object during PUT operation in unversioned/suspened bucket.
|
||||
enabled: false
|
||||
# Timeout to one delete operation in background.
|
||||
timeout: 30s
|
||||
# Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that.
|
||||
queue: 10000
|
||||
|
||||
web:
|
||||
# ReadTimeout is the maximum duration for reading the entire
|
||||
|
|
|
@ -728,20 +728,13 @@ features:
|
|||
md5:
|
||||
enabled: false
|
||||
tree_pool_netmap_support: true
|
||||
remove_on_replace:
|
||||
enabled: false
|
||||
timeout: 30s
|
||||
queue: 10000
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------------------------|-------------|---------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `md5.enabled` | `bool` | yes | `false` | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||
| `policy.deny_by_default` | `bool` | yes | `false` | Enable denying access for request that doesn't match any policy chain rules. |
|
||||
| `tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
||||
| `remove_on_replace.enabled` | `bool` | yes | `false` | Enable removing old object during PUT operation in unversioned/suspened bucket. |
|
||||
| `remove_on_replace.timeout` | `durations` | yes | `30s` | Timeout to one delete operation in background. |
|
||||
| `remove_on_replace.queue` | `int` | false | `10000` | Buffer size for objects to delete. If buffer is full creation new unversioned object won't remove old one. Lifecycler will do that. |
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|----------------------------|--------|---------------|---------------|---------------------------------------------------------------------------------------------------------|
|
||||
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||
| `policy.deny_by_default` | `bool` | yes | false | Enable denying access for request that doesn't match any policy chain rules. |
|
||||
| `tree_pool_netmap_support` | `bool` | no | false | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
||||
|
||||
# `web` section
|
||||
Contains web server configuration parameters.
|
||||
|
|
|
@ -159,7 +159,6 @@ const (
|
|||
ResolveBucket = "resolve bucket"
|
||||
FailedToResolveCID = "failed to resolve CID"
|
||||
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks"
|
||||
FailedToQueueOldUnversionedObjectToDelete = "failed to queue old unversioned object to delete, removal will be performed in lifecycler"
|
||||
)
|
||||
|
||||
// External storage.
|
||||
|
@ -180,7 +179,6 @@ const (
|
|||
PutObject = "put object"
|
||||
CouldNotFetchObjectMeta = "could not fetch object meta"
|
||||
FailedToDeleteObject = "failed to delete object"
|
||||
FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object"
|
||||
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
||||
)
|
||||
|
||||
|
@ -203,7 +201,6 @@ const (
|
|||
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
|
||||
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
||||
GetBucketCorsFromTree = "get bucket cors from tree"
|
||||
FailedToRemoveOldUnversionedNode = "failed to remove old unversioned node"
|
||||
)
|
||||
|
||||
// Authmate.
|
||||
|
|
|
@ -1318,18 +1318,10 @@ func (c *Tree) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, fil
|
|||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.GetUnversioned")
|
||||
defer span.End()
|
||||
|
||||
res, err := c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res[0], err
|
||||
return c.getUnversioned(ctx, bktInfo, versionTree, filepath)
|
||||
}
|
||||
|
||||
// getUnversioned returns all unversioned nodes for specified filepath.
|
||||
// List of node is always not empty if error is nil.
|
||||
// First item of list is the latest (according timestamp).
|
||||
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) ([]*data.NodeVersion, error) {
|
||||
func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string) (*data.NodeVersion, error) {
|
||||
nodes, err := c.getVersions(ctx, bktInfo, treeID, filepath, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1350,10 +1342,10 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
|||
return nodes[i].Timestamp > nodes[j].Timestamp
|
||||
})
|
||||
|
||||
return nodes, nil
|
||||
return nodes[0], nil
|
||||
}
|
||||
|
||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
||||
defer span.End()
|
||||
|
||||
|
@ -1790,7 +1782,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
|
|||
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
||||
}
|
||||
|
||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) {
|
||||
path := pathFromName(version.FilePath)
|
||||
meta := map[string]string{
|
||||
oidKV: version.OID.EncodeToString(),
|
||||
|
@ -1821,40 +1813,25 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
|||
if version.IsUnversioned {
|
||||
meta[isUnversionedKV] = "true"
|
||||
|
||||
nodes, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
||||
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
|
||||
if err == nil {
|
||||
node := nodes[0]
|
||||
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParentID, meta); err != nil {
|
||||
return 0, nil, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
oldOIDs := make([]oid.ID, len(nodes))
|
||||
for i, oldNode := range nodes {
|
||||
oldOIDs[i] = oldNode.OID
|
||||
}
|
||||
|
||||
return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)
|
||||
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
|
||||
}
|
||||
|
||||
if !errors.Is(err, tree.ErrNodeNotFound) {
|
||||
return 0, nil, err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
||||
return nodeID, nil, err
|
||||
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
|
||||
}
|
||||
|
||||
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodes []*data.NodeVersion) error {
|
||||
for _, node := range nodes[1:] {
|
||||
if err := c.service.RemoveNode(ctx, bktInfo, treeID, node.ID); err != nil {
|
||||
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldUnversionedNode, zap.Uint64("node_id", node.ID),
|
||||
zap.Error(err), logs.TagField(logs.TagExternalStorageTree))
|
||||
}
|
||||
}
|
||||
|
||||
latest := nodes[0]
|
||||
taggingNode, err := c.getTreeNode(ctx, bktInfo, latest.ID, isTagKV)
|
||||
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||
taggingNode, err := c.getTreeNode(ctx, bktInfo, nodeID, isTagKV)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ func TestTreeServiceAddVersion(t *testing.T) {
|
|||
IsUnversioned: true,
|
||||
}
|
||||
|
||||
nodeID, _, err := treeService.AddVersion(ctx, bktInfo, version)
|
||||
nodeID, err := treeService.AddVersion(ctx, bktInfo, version)
|
||||
require.NoError(t, err)
|
||||
|
||||
storedNode, err := treeService.GetUnversioned(ctx, bktInfo, "path/to/version")
|
||||
|
@ -404,7 +404,7 @@ func TestVersionsByPrefixStreamImpl_Next(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, v := range versions {
|
||||
_, _, err = treeService.AddVersion(ctx, bktInfo, v)
|
||||
_, err = treeService.AddVersion(ctx, bktInfo, v)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue