[#1356] engine: Evacuate object from shards concurrently

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-18 12:15:32 +03:00
parent bdf386366c
commit 34e6a309c6
8 changed files with 468 additions and 108 deletions

View file

@ -21,6 +21,9 @@ const (
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope" scopeFlag = "scope"
containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count"
scopeAll = "all" scopeAll = "all"
scopeObjects = "objects" scopeObjects = "objects"
scopeTrees = "trees" scopeTrees = "trees"
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
req := &control.StartShardEvacuationRequest{ req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{ Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd), Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
}, },
} }
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -10,7 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -24,6 +23,16 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
) )
var ( var (
@ -79,6 +88,9 @@ type EvacuateShardPrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool Async bool
Scope EvacuateScope Scope EvacuateScope
ContainerWorkerCount uint32
ObjectWorkerCount uint32
} }
// EvacuateShardRes represents result of the EvacuateShard operation. // EvacuateShardRes represents result of the EvacuateShard operation.
@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res return res
} }
const defaultEvacuateBatchSize = 100
type pooledShard struct { type pooledShard struct {
hashedShard hashedShard
pool util.WorkerPool pool util.WorkerPool
@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return nil, err return nil, err
} }
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error { eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
}) })
if prm.Async { if prm.Async {
@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
} }
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
var err error var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err return err
} }
for _, shardID := range shardIDs { ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { continueLoop := true
for i := 0; continueLoop && i < len(shardIDs); i++ {
select {
case <-ctx.Done():
continueLoop = false
default:
egShard.Go(func() error {
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
if err != nil {
cancel(err)
}
return err
})
}
}
err = egShard.Wait()
if err != nil {
err = fmt.Errorf("shard error: %w", err)
}
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errContainer != nil {
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
}
if errObject != nil {
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
}
if err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err return err
} }
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs), zap.Strings("shard_ids", shardIDs),
@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return nil return nil
} }
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
) {
operationCtx, cancel := context.WithCancelCause(ctx)
egObject, _ := errgroup.WithContext(operationCtx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject.SetLimit(int(objectWorkerCount))
egContainer, _ := errgroup.WithContext(operationCtx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer.SetLimit(int(containerWorkerCount))
egShard, _ := errgroup.WithContext(operationCtx)
return operationCtx, cancel, egShard, egContainer, egObject
}
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End() defer span.End()
@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
return nil return nil
} }
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error { ) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes( trace.WithAttributes(
@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
defer span.End() defer span.End()
if prm.Scope.WithObjects() { if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
return err return err
} }
} }
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err return err
@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil return nil
} }
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error { ) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true) var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
var c *meta.Cursor select {
for { case <-ctx.Done():
listPrm.WithCursor(c) return context.Cause(ctx)
default:
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
} }
egContainer.Go(func() error {
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
egObject.Go(func() error {
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
if err != nil {
cancel(err)
}
return err
})
return nil
}
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil {
cancel(err)
}
return err
})
return nil
}
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
cancel(err)
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return nil
} }
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm var listPrm pilorama.TreeListTreesPrm
first := true first := true
@ -637,51 +718,49 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil return shards, nil
} }
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End() defer span.End()
for i := range toEvacuate {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return context.Cause(ctx)
default: default:
} }
addr := toEvacuate[i].Address
shards := getShards()
addr := objInfo.Address
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true) getPrm.SkipEvacCheck(true)
getRes, err := sh.Get(ctx, getPrm) getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil { if err != nil {
if prm.IgnoreErrors { if prm.IgnoreErrors {
res.objFailed.Add(1) res.objFailed.Add(1)
continue return nil
} }
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
if err != nil { if err != nil {
return err return err
} }
if evacuatedLocal { if evacuatedLocal {
continue return nil
} }
if prm.ObjectsHandler == nil { if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because // Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless. // ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) return fmt.Errorf("%w: %s", errPutShard, objInfo)
} }
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
@ -699,7 +778,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
} else { } else {
return fmt.Errorf("object %s was not replicated", addr) return fmt.Errorf("object %s was not replicated", addr)
} }
}
return nil return nil
} }

View file

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error") errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n uint64 var n atomic.Uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
if n == max { if n.Load() == max {
return false, errReplication return false, errReplication
} }
n++ n.Add(1)
for i := range objects { for i := range objects {
if addr == objectCore.AddressOf(objects[i]) { if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj) require.Equal(t, objects[i], obj)
@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
} }
func TestEvacuateCancellationByError(t *testing.T) {
t.Parallel()
e, ids, _ := newEngineEvacuate(t, 2, 10)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
var once atomic.Bool
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
var err error
flag := true
if once.CompareAndSwap(false, true) {
err = errors.New("test error")
flag = false
}
return flag, err
}
prm.Scope = EvacuateScopeObjects
prm.ObjectWorkerCount = 2
prm.ContainerWorkerCount = 2
_, err := e.Evacuate(context.Background(), prm)
require.ErrorContains(t, err, "test error")
}
func TestEvacuateSingleProcess(t *testing.T) { func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3) e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() { defer func() {
@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move) evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids prm.ShardID = ids
@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
if op.Time == 0 { if op.Time == 0 {
return true, "", nil return true, "", nil
} }
mutex.Lock()
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
mutex.Unlock()
height = op.Time + 1 height = op.Time + 1
} }
} }

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"time" "time"
@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor return l.cursor
} }
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects. // cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests. // Use cursor value from response for consecutive requests.
@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
return rawID, name[0] return rawID, name[0]
} }
// IterateOverContainers lists physical containers available in metabase starting from first.
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverContainers(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
var containerID cid.ID
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
c := tx.Cursor()
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
err := prm.Handler(ctx, bktName, cnt)
if err != nil {
return err
}
}
}
return nil
}
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(containerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
}
}
return nil
}

View file

@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
return r.containers return r.containers
} }
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon containers in db.
Handler func(context.Context, *objectcore.Info) error
}
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
cursor: res.Cursor(), cursor: res.Cursor(),
}, nil }, nil
} }
// IterateOverContainers lists physical containers presented in shard.
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over containers: %w", err)
}
return nil
}
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over objects: %w", err)
}
return nil
}

View file

@ -29,6 +29,8 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
TreeHandler: s.replicateTree, TreeHandler: s.replicateTree,
Async: true, Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
} }
_, err = s.s.Evacuate(ctx, prm) _, err = s.s.Evacuate(ctx, prm)

View file

@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
bool ignore_errors = 2; bool ignore_errors = 2;
// Evacuation scope. // Evacuation scope.
uint32 scope = 3; uint32 scope = 3;
// Count of concurrent container evacuation workers.
uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers.
uint32 object_worker_count = 5;
} }
Body body = 1; Body body = 1;

Binary file not shown.