diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 6fa5ed75c..04a67e5b5 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -21,6 +21,9 @@ const ( noProgressFlag = "no-progress" scopeFlag = "scope" + containerWorkerCountFlag = "container-worker-count" + objectWorkerCountFlag = "object-worker-count" + scopeAll = "all" scopeObjects = "objects" scopeTrees = "trees" @@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) + objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ - Shard_ID: getShardIDList(cmd), - IgnoreErrors: ignoreErrors, - Scope: getEvacuationScope(cmd), + Shard_ID: getShardIDList(cmd), + IgnoreErrors: ignoreErrors, + Scope: getEvacuationScope(cmd), + ContainerWorkerCount: containerWorkerCount, + ObjectWorkerCount: objectWorkerCount, }, } @@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() { flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") + flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bef6edfb..3db556a8f 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -24,6 +23,16 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + // containerWorkerCountDefault is a default value of the count of + // concurrent container evacuation workers. + containerWorkerCountDefault = 10 + // objectWorkerCountDefault is a default value of the count of + // concurrent object evacuation workers. + objectWorkerCountDefault = 10 ) var ( @@ -79,6 +88,9 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + + ContainerWorkerCount uint32 + ObjectWorkerCount uint32 } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -const defaultEvacuateBatchSize = 100 - type pooledShard struct { hashedShard pool util.WorkerPool @@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return nil, err } + var mtx sync.RWMutex + copyShards := func() []pooledShard { + mtx.RLock() + defer mtx.RUnlock() + t := make([]pooledShard, len(shards)) + copy(t, shards) + return t + } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) + return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) if prm.Async { @@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context { } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", @@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return err } - for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - return err + ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm) + continueLoop := true + for i := 0; continueLoop && i < len(shardIDs); i++ { + select { + case <-ctx.Done(): + continueLoop = false + default: + egShard.Go(func() error { + err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject) + if err != nil { + cancel(err) + } + return err + }) } } + err = egShard.Wait() + if err != nil { + err = fmt.Errorf("shard error: %w", err) + } + errContainer := egContainer.Wait() + errObject := egObject.Wait() + if errContainer != nil { + err = errors.Join(err, fmt.Errorf("container error: %w", errContainer)) + } + if errObject != nil { + err = errors.Join(err, fmt.Errorf("object error: %w", errObject)) + } + if err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) + return err + } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs), @@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return nil } +func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) ( + context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group, +) { + operationCtx, cancel := context.WithCancelCause(ctx) + egObject, _ := errgroup.WithContext(operationCtx) + objectWorkerCount := prm.ObjectWorkerCount + if objectWorkerCount == 0 { + objectWorkerCount = objectWorkerCountDefault + } + egObject.SetLimit(int(objectWorkerCount)) + egContainer, _ := errgroup.WithContext(operationCtx) + containerWorkerCount := prm.ContainerWorkerCount + if containerWorkerCount == 0 { + containerWorkerCount = containerWorkerCountDefault + } + egContainer.SetLimit(int(containerWorkerCount)) + egShard, _ := errgroup.WithContext(operationCtx) + + return operationCtx, cancel, egShard, egContainer, egObject +} + func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") defer span.End() @@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha return nil } -func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( @@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E defer span.End() if prm.Scope.WithObjects() { - if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { + if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil { return err } } - if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { return err @@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E return nil } -func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - sh := shardsToEvacuate[shardID] - sh.SetEvacuationInProgress(true) - - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(ctx, listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - break + var cntPrm shard.IterateOverContainersPrm + cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + egContainer.Go(func() error { + var objPrm shard.IterateOverObjectsInContainerPrm + objPrm.BucketName = name + objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + egObject.Go(func() error { + err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) + if err != nil { + cancel(err) + } + return err + }) + return nil + } + err := sh.IterateOverObjectsInContainer(ctx, objPrm) + if err != nil { + cancel(err) } - e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return err - } - - if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil { - return err - } - - c = listRes.Cursor() + }) + return nil } - return nil + + sh.SetEvacuationInProgress(true) + err := sh.IterateOverContainers(ctx, cntPrm) + if err != nil { + cancel(err) + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] + shards := getShards() var listPrm pilorama.TreeListTreesPrm first := true @@ -637,68 +718,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return shards, nil } -func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", - trace.WithAttributes( - attribute.Int("objects_count", len(toEvacuate)), - )) + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() - for i := range toEvacuate { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - addr := toEvacuate[i].Address + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } - var getPrm shard.GetPrm - getPrm.SetAddress(addr) - getPrm.SkipEvacCheck(true) + shards := getShards() + addr := objInfo.Address - getRes, err := sh.Get(ctx, getPrm) - if err != nil { - if prm.IgnoreErrors { - res.objFailed.Add(1) - continue - } - e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } + var getPrm shard.GetPrm + getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) - if err != nil { - return err - } - - if evacuatedLocal { - continue - } - - if prm.ObjectsHandler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) - } - - moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) - if err != nil { - e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - if moved { - res.objEvacuated.Add(1) - } else if prm.IgnoreErrors { + getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm) + if err != nil { + if prm.IgnoreErrors { res.objFailed.Add(1) - e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else { - return fmt.Errorf("object %s was not replicated", addr) + return nil } + e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) + if err != nil { + return err + } + + if evacuatedLocal { + return nil + } + + if prm.ObjectsHandler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return fmt.Errorf("%w: %s", errPutShard, objInfo) + } + + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 28529fab9..f72333399 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -6,6 +6,8 @@ import ( "fmt" "path/filepath" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { - var n uint64 + var n atomic.Uint64 return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { - if n == max { + if n.Load() == max { return false, errReplication } - n++ + n.Add(1) for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) @@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) { require.Equal(t, uint64(0), res.ObjectsEvacuated()) } +func TestEvacuateCancellationByError(t *testing.T) { + t.Parallel() + e, ids, _ := newEngineEvacuate(t, 2, 10) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + var prm EvacuateShardPrm + prm.ShardID = ids[1:2] + var once atomic.Bool + prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) { + var err error + flag := true + if once.CompareAndSwap(false, true) { + err = errors.New("test error") + flag = false + } + return flag, err + } + prm.Scope = EvacuateScopeObjects + prm.ObjectWorkerCount = 2 + prm.ContainerWorkerCount = 2 + + _, err := e.Evacuate(context.Background(), prm) + require.ErrorContains(t, err, "test error") +} + func TestEvacuateSingleProcess(t *testing.T) { e, ids, _ := newEngineEvacuate(t, 2, 3) defer func() { @@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) var prm EvacuateShardPrm prm.ShardID = ids @@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) { if op.Time == 0 { return true, "", nil } + mutex.Lock() evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) + mutex.Unlock() height = op.Time + 1 } } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..5943be7f4 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "context" "time" @@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name. + BucketName []byte + // Handler function executed upon objects in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) return rawID, name[0] } + +// IterateOverContainers lists physical containers available in metabase starting from first. +func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverContainers(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error { + var containerID cid.ID + for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} { + c := tx.Cursor() + for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() { + cidRaw, _ := parseContainerIDWithPrefix(&containerID, name) + if cidRaw == nil { + continue + } + + bktName := make([]byte, len(name)) + copy(bktName, name) + var cnt cid.ID + copy(cnt[:], containerID[:]) + err := prm.Handler(ctx, bktName, cnt) + if err != nil { + return err + } + } + } + + return nil +} + +// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first. +func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + var containerID cid.ID + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName) + if cidRaw == nil { + return nil + } + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte, + containerID cid.ID, prm IterateOverObjectsInContainerPrm, +) error { + bkt := tx.Bucket(prm.BucketName) + if bkt == nil { + return nil + } + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, v := c.First() + + var objType objectSDK.Type + + switch prefix { + case primaryPrefix: + objType = objectSDK.TypeRegular + case lockersPrefix: + objType = objectSDK.TypeLock + case tombstonePrefix: + objType = objectSDK.TypeTombstone + default: + return nil + } + + for ; k != nil; k, v = c.Next() { + var obj oid.ID + if err := obj.Decode(k); err != nil { + break + } + + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } + } + + var a oid.Address + a.SetContainer(containerID) + a.SetObject(obj) + objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} + err := prm.Handler(ctx, &objInfo) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 08ea81a0c..9f56ec750 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID { return r.containers } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name. + BucketName []byte + // Handler function executed upon containers in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List cursor: res.Cursor(), }, nil } + +// IterateOverContainers lists physical containers presented in shard. +func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverContainersPrm + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverContainers(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over containers: %w", err) + } + + return nil +} + +// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name. +func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverObjectsInContainerPrm + metaPrm.BucketName = prm.BucketName + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over objects: %w", err) + } + + return nil +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index aacebe9e3..bdc6f7c38 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha } prm := engine.EvacuateShardPrm{ - ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), - IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicateObject, - TreeHandler: s.replicateTree, - Async: true, - Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + ObjectsHandler: s.replicateObject, + TreeHandler: s.replicateTree, + Async: true, + Scope: engine.EvacuateScope(req.GetBody().GetScope()), + ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), + ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 04994328a..88a06de22 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -394,6 +394,10 @@ message StartShardEvacuationRequest { bool ignore_errors = 2; // Evacuation scope. uint32 scope = 3; + // Count of concurrent container evacuation workers. + uint32 container_worker_count = 4; + // Count of concurrent object evacuation workers. + uint32 object_worker_count = 5; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 019cac290..e92a8acd1 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ