diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index f522e6fb..d95098fb 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -30,10 +30,10 @@ var ( // EvacuateShardPrm represents parameters for the EvacuateShard operation. type EvacuateShardPrm struct { - shardID []*shard.ID - handler func(context.Context, oid.Address, *objectSDK.Object) error - ignoreErrors bool - async bool + ShardID []*shard.ID + Handler func(context.Context, oid.Address, *objectSDK.Object) error + IgnoreErrors bool + Async bool } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -54,26 +54,6 @@ func NewEvacuateShardRes() *EvacuateShardRes { } } -// WithShardIDList sets shard ID. -func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) { - p.shardID = id -} - -// WithIgnoreErrors sets flag to ignore errors. -func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// WithFaultHandler sets handler to call for objects which cannot be saved on other shards. -func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) { - p.handler = f -} - -// WithAsync sets flag to run evacuate async. -func (p *EvacuateShardPrm) WithAsync(async bool) { - p.async = async -} - // Evacuated returns amount of evacuated objects. // Objects for which handler returned no error are also assumed evacuated. func (p *EvacuateShardRes) Evacuated() uint64 { @@ -145,20 +125,20 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev default: } - shardIDs := make([]string, len(prm.shardID)) - for i := range prm.shardID { - shardIDs[i] = prm.shardID[i].String() + shardIDs := make([]string, len(prm.ShardID)) + for i := range prm.ShardID { + shardIDs[i] = prm.ShardID[i].String() } ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.async), - attribute.Bool("ignoreErrors", prm.ignoreErrors), + attribute.Bool("async", prm.Async), + attribute.Bool("ignoreErrors", prm.IgnoreErrors), )) defer span.End() - shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil) + shards, weights, err := e.getActualShards(shardIDs, prm.Handler != nil) if err != nil { return nil, err } @@ -173,7 +153,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev } res := NewEvacuateShardRes() - ctx = ctxOrBackground(ctx, prm.async) + ctx = ctxOrBackground(ctx, prm.Async) eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) if err != nil { return nil, err @@ -183,7 +163,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate) }) - if prm.async { + if prm.Async { return nil, nil } @@ -204,8 +184,8 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", trace.WithAttributes( attribute.StringSlice("shardIDs", shardIDs), - attribute.Bool("async", prm.async), - attribute.Bool("ignoreErrors", prm.ignoreErrors), + attribute.Bool("async", prm.Async), + attribute.Bool("ignoreErrors", prm.IgnoreErrors), )) defer func() { @@ -357,7 +337,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to getRes, err := sh.Get(ctx, getPrm) if err != nil { - if prm.ignoreErrors { + if prm.IgnoreErrors { res.failed.Add(1) continue } @@ -375,13 +355,13 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to continue } - if prm.handler == nil { + if prm.Handler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) } - err = prm.handler(ctx, addr, getRes.Object()) + err = prm.Handler(ctx, addr, getRes.Object()) if err != nil { e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index d874734d..2dc4a177 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -102,7 +102,7 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) var prm EvacuateShardPrm - prm.WithShardIDList(ids[2:3]) + prm.ShardID = ids[2:3] t.Run("must be read-only", func(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) @@ -173,13 +173,13 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) var prm EvacuateShardPrm - prm.shardID = ids[0:1] + prm.ShardID = ids[0:1] res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errMustHaveTwoShards) require.Equal(t, uint64(0), res.Evacuated()) - prm.handler = acceptOneOf(objects, 2) + prm.Handler = acceptOneOf(objects, 2) res, err = e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) @@ -196,15 +196,15 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = acceptOneOf(objects, 2) + prm.ShardID = ids[1:2] + prm.Handler = acceptOneOf(objects, 2) res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) require.Equal(t, uint64(2), res.Evacuated()) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, 3) + prm.Handler = acceptOneOf(objects, 3) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) @@ -233,15 +233,15 @@ func TestEvacuateNetwork(t *testing.T) { } var prm EvacuateShardPrm - prm.shardID = evacuateIDs - prm.handler = acceptOneOf(objects, totalCount-1) + prm.ShardID = evacuateIDs + prm.Handler = acceptOneOf(objects, totalCount-1) res, err := e.Evacuate(context.Background(), prm) require.ErrorIs(t, err, errReplication) require.Equal(t, totalCount-1, res.Evacuated()) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, totalCount) + prm.Handler = acceptOneOf(objects, totalCount) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) @@ -261,8 +261,8 @@ func TestEvacuateCancellation(t *testing.T) { require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ShardID = ids[1:2] + prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-ctx.Done(): return ctx.Err() @@ -292,8 +292,8 @@ func TestEvacuateSingleProcess(t *testing.T) { running := make(chan interface{}) var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ShardID = ids[1:2] + prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: default: @@ -334,8 +334,8 @@ func TestEvacuateAsync(t *testing.T) { running := make(chan interface{}) var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { + prm.ShardID = ids[1:2] + prm.Handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { select { case <-running: default: diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 8f62c348..52ef083c 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -25,10 +25,11 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe return nil, status.Error(codes.PermissionDenied, err.Error()) } - var prm engine.EvacuateShardPrm - prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - prm.WithFaultHandler(s.replicate) + prm := engine.EvacuateShardPrm{ + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + Handler: s.replicate, + } res, err := s.s.Evacuate(ctx, prm) if err != nil { diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index cdf3656e..112d4449 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -17,11 +17,12 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha return nil, status.Error(codes.PermissionDenied, err.Error()) } - var prm engine.EvacuateShardPrm - prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - prm.WithFaultHandler(s.replicate) - prm.WithAsync(true) + prm := engine.EvacuateShardPrm{ + ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + Handler: s.replicate, + Async: true, + } _, err = s.s.Evacuate(ctx, prm) if err != nil {