[#1437] shard: Fix contextcheck linter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-21 11:56:38 +03:00
parent efa22b6ded
commit 4440eeb0e1
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
38 changed files with 165 additions and 160 deletions

View file

@ -8,7 +8,7 @@ import (
)
// SetMode sets the blobstor mode of operation.
func (b *BlobStor) SetMode(m mode.Mode) error {
func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
b.modeMtx.Lock()
defer b.modeMtx.Unlock()
@ -22,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
err := b.Close()
if err == nil {
if err = b.openBlobStor(context.TODO(), m); err == nil {
if err = b.openBlobStor(ctx, m); err == nil {
err = b.Init()
}
}

View file

@ -56,7 +56,7 @@ func (e *StorageEngine) open(ctx context.Context) error {
sh := e.shards[res.id]
delete(e.shards, res.id)
err := sh.Close()
err := sh.Close(ctx)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id),
@ -108,7 +108,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
sh := e.shards[res.id]
delete(e.shards, res.id)
err := sh.Close()
err := sh.Close(ctx)
if err != nil {
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id),
@ -126,7 +126,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
}
e.wg.Add(1)
go e.setModeLoop()
go e.setModeLoop(ctx)
return nil
}
@ -153,7 +153,7 @@ func (e *StorageEngine) Close(ctx context.Context) error {
}
// closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(releasePools bool) error {
func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -164,8 +164,8 @@ func (e *StorageEngine) close(releasePools bool) error {
}
for id, sh := range e.shards {
if err := sh.Close(); err != nil {
e.log.Debug(context.Background(), logs.EngineCouldNotCloseShard,
if err := sh.Close(ctx); err != nil {
e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
zap.String("id", id),
zap.String("error", err.Error()),
)
@ -213,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
return e.open(ctx)
}
} else if prevErr == nil { // ok -> block
return e.close(errors.Is(err, errClosed))
return e.close(ctx, errors.Is(err, errClosed))
}
// otherwise do nothing
@ -306,7 +306,7 @@ loop:
e.mtx.RUnlock()
e.removeShards(shardsToRemove...)
e.removeShards(ctx, shardsToRemove...)
for _, p := range shardsToReload {
err := p.sh.Reload(ctx, p.opts...)
@ -330,13 +330,13 @@ loop:
err = sh.Init(ctx)
}
if err != nil {
_ = sh.Close()
_ = sh.Close(ctx)
return fmt.Errorf("could not init %s shard: %w", idStr, err)
}
err = e.addShard(sh)
if err != nil {
_ = sh.Close()
_ = sh.Close(ctx)
return fmt.Errorf("could not add %s shard: %w", idStr, err)
}

View file

@ -55,7 +55,7 @@ type setModeRequest struct {
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) setModeLoop() {
func (e *StorageEngine) setModeLoop(ctx context.Context) {
defer e.wg.Done()
var (
@ -75,7 +75,7 @@ func (e *StorageEngine) setModeLoop() {
if !ok {
inProgress[sid] = struct{}{}
go func() {
e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta)
mtx.Lock()
delete(inProgress, sid)
@ -87,7 +87,7 @@ func (e *StorageEngine) setModeLoop() {
}
}
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) {
sid := sh.ID()
log := e.log.With(
zap.Stringer("shard_id", sid),
@ -97,7 +97,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
defer e.mtx.RUnlock()
if isMeta {
err := sh.SetMode(mode.DegradedReadOnly)
err := sh.SetMode(ctx, mode.DegradedReadOnly)
if err == nil {
log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
return
@ -106,7 +106,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
zap.Error(err))
}
err := sh.SetMode(mode.ReadOnly)
err := sh.SetMode(ctx, mode.ReadOnly)
if err != nil {
log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
return

View file

@ -158,10 +158,10 @@ func TestErrorReporting(t *testing.T) {
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
}
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, false))
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, true))
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close(context.Background()))
})

View file

@ -146,7 +146,7 @@ func TestEvacuateShardObjects(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
@ -237,7 +237,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
evacuateShardID := ids[0].String()
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
@ -260,8 +260,8 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
@ -298,7 +298,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
}
for i := range ids {
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly))
}
var prm EvacuateShardPrm
@ -327,8 +327,8 @@ func TestEvacuateCancellation(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
@ -357,8 +357,8 @@ func TestEvacuateCancellationByError(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[1:2]
@ -386,8 +386,8 @@ func TestEvacuateSingleProcess(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
@ -429,8 +429,8 @@ func TestEvacuateObjectsAsync(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
@ -515,7 +515,7 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
@ -594,8 +594,8 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move)
@ -753,7 +753,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
@ -810,7 +810,7 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
prm.RepOneOnly = true
prm.ContainerWorkerCount = 10
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)

View file

@ -121,7 +121,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
return sh.ID(), nil
}
func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) {
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
@ -139,8 +139,8 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh
shard.WithZeroCountCallback(e.processZeroCountContainers),
)...)
if err := sh.UpdateID(); err != nil {
e.log.Warn(context.Background(), logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
if err := sh.UpdateID(ctx); err != nil {
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
}
return sh, nil
@ -203,7 +203,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
// removeShards removes specified shards. Skips non-existent shards.
// Logs errors about shards that it could not Close after the removal.
func (e *StorageEngine) removeShards(ids ...string) {
func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
if len(ids) == 0 {
return
}
@ -228,22 +228,22 @@ func (e *StorageEngine) removeShards(ids ...string) {
delete(e.shardPools, id)
}
e.log.Info(context.Background(), logs.EngineShardHasBeenRemoved,
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", id))
}
e.mtx.Unlock()
for _, sh := range ss {
err := sh.SetMode(mode.Disabled)
err := sh.SetMode(ctx, mode.Disabled)
if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
}
err = sh.Close()
err = sh.Close(ctx)
if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
@ -310,7 +310,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto
// SetShardMode sets mode of the shard with provided identifier.
//
// Returns an error if shard mode was not set, or shard was not found in storage engine.
func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
e.mtx.RLock()
defer e.mtx.RUnlock()
@ -320,7 +320,7 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
sh.errorCount.Store(0)
e.metrics.ClearErrorCounter(shID)
}
return sh.SetMode(m)
return sh.SetMode(ctx, m)
}
}
@ -346,7 +346,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
}
}
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error {
if len(ids) == 0 {
return logicerr.New("ids must be non-empty")
}
@ -356,20 +356,20 @@ func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
return err
}
return e.closeShards(deletedShards)
return e.closeShards(ctx, deletedShards)
}
// closeShards closes deleted shards. Tries to close all shards.
// Returns single error with joined shard errors.
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error {
var multiErr error
var multiErrGuard sync.Mutex
var eg errgroup.Group
for _, sh := range deletedShards {
eg.Go(func() error {
err := sh.SetMode(mode.Disabled)
err := sh.SetMode(ctx, mode.Disabled)
if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)
@ -378,9 +378,9 @@ func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
multiErrGuard.Unlock()
}
err = sh.Close()
err = sh.Close(ctx)
if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()),
zap.Error(err),
)

View file

@ -33,7 +33,7 @@ func TestRemoveShard(t *testing.T) {
for id, remove := range mSh {
if remove {
e.removeShards(id)
e.removeShards(context.Background(), id)
}
}
@ -55,11 +55,11 @@ func TestDisableShards(t *testing.T) {
e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }()
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(context.Background(), ids), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(context.Background(), nil), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(context.Background(), []*shard.ID{}), new(logicerr.Logical))
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
require.NoError(t, e.DetachShards(context.Background(), []*shard.ID{ids[0]}))
require.Equal(t, 1, len(e.shards))
}

View file

@ -11,7 +11,7 @@ import (
// Component represents single storage component.
type Component interface {
Open(context.Context, mode.Mode) error
SetMode(mode.Mode) error
SetMode(context.Context, mode.Mode) error
Init() error
Close() error
}
@ -91,12 +91,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
// call `SetMode` on all not-yet-initialized components.
s := cons(t)
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.SetMode(m))
require.NoError(t, s.SetMode(context.Background(), m))
t.Run("after open in RO", func(t *testing.T) {
require.NoError(t, s.Close())
require.NoError(t, s.Open(context.Background(), mode.ReadOnly))
require.NoError(t, s.SetMode(m))
require.NoError(t, s.SetMode(context.Background(), m))
})
require.NoError(t, s.Close())
@ -106,7 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
// Use-case: notmal node operation.
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.Init())
require.NoError(t, s.SetMode(m))
require.NoError(t, s.SetMode(context.Background(), m))
require.NoError(t, s.Close())
})
}
@ -116,7 +116,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
s := cons(t)
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.Init())
require.NoError(t, s.SetMode(from))
require.NoError(t, s.SetMode(to))
require.NoError(t, s.SetMode(context.Background(), from))
require.NoError(t, s.SetMode(context.Background(), to))
require.NoError(t, s.Close())
}

View file

@ -39,7 +39,7 @@ var (
)
// Open boltDB instance for metabase.
func (db *DB) Open(_ context.Context, m mode.Mode) error {
func (db *DB) Open(ctx context.Context, m mode.Mode) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
db.mode = m
@ -48,10 +48,10 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
if m.NoMetabase() {
return nil
}
return db.openDB(m)
return db.openDB(ctx, m)
}
func (db *DB) openDB(mode mode.Mode) error {
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
@ -65,10 +65,10 @@ func (db *DB) openDB(mode mode.Mode) error {
}
db.boltOptions.ReadOnly = mode.ReadOnly()
return metaerr.Wrap(db.openBolt())
return metaerr.Wrap(db.openBolt(ctx))
}
func (db *DB) openBolt() error {
func (db *DB) openBolt(ctx context.Context) error {
var err error
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
@ -226,7 +226,7 @@ func (db *DB) close() error {
// If there was a problem with applying new configuration, an error is returned.
//
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
func (db *DB) Reload(opts ...Option) (bool, error) {
func (db *DB) Reload(ctx context.Context, opts ...Option) (bool, error) {
var c cfg
for i := range opts {
opts[i](&c)
@ -243,7 +243,7 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
db.mode = mode.Disabled
db.metrics.SetMode(mode.ComponentDisabled)
db.info.Path = c.info.Path
if err := db.openBolt(); err != nil {
if err := db.openBolt(ctx); err != nil {
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
}

View file

@ -1,6 +1,7 @@
package meta
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -8,7 +9,7 @@ import (
// SetMode sets the metabase mode of operation.
// If the mode assumes no operation metabase, the database is closed.
func (db *DB) SetMode(m mode.Mode) error {
func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
@ -25,7 +26,7 @@ func (db *DB) SetMode(m mode.Mode) error {
if m.NoMetabase() {
db.boltDB = nil
} else {
err := db.openDB(m)
err := db.openDB(ctx, m)
if err == nil && !m.ReadOnly() {
err = db.Init()
}

View file

@ -2,6 +2,7 @@ package meta
import (
"bytes"
"context"
"errors"
"fmt"
"os"
@ -21,7 +22,7 @@ var (
// If id is missing, returns nil, nil.
//
// GetShardID does not report any metrics.
func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error) {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
db.mode = mode
@ -30,7 +31,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
return nil, nil
}
if err := db.openDB(mode); err != nil {
if err := db.openDB(ctx, mode); err != nil {
return nil, fmt.Errorf("failed to open metabase: %w", err)
}
@ -59,7 +60,7 @@ func (db *DB) readShardID() ([]byte, error) {
// SetShardID sets metabase operation mode
// and writes shard id to db.
func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) error {
db.modeMtx.Lock()
defer db.modeMtx.Unlock()
db.mode = mode
@ -68,7 +69,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
return ErrReadOnlyMode
}
if err := db.openDB(mode); err != nil {
if err := db.openDB(ctx, mode); err != nil {
return fmt.Errorf("failed to open metabase: %w", err)
}

View file

@ -58,7 +58,7 @@ func TestVersion(t *testing.T) {
})
t.Run("old data", func(t *testing.T) {
db := newDB(t)
require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite))
require.NoError(t, db.SetShardID(context.Background(), []byte{1, 2, 3, 4}, mode.ReadWrite))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())

View file

@ -91,7 +91,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
return &b
}
func (t *boltForest) SetMode(m mode.Mode) error {
func (t *boltForest) SetMode(_ context.Context, m mode.Mode) error {
t.modeMtx.Lock()
defer t.modeMtx.Unlock()

View file

@ -119,7 +119,7 @@ func (f *memoryForest) Open(context.Context, mode.Mode) error {
return nil
}
func (f *memoryForest) SetMode(mode.Mode) error {
func (f *memoryForest) SetMode(context.Context, mode.Mode) error {
return nil
}

View file

@ -63,7 +63,7 @@ type ForestStorage interface {
Init() error
Open(context.Context, mode.Mode) error
Close() error
SetMode(m mode.Mode) error
SetMode(context.Context, mode.Mode) error
SetParentID(id string)
Forest

View file

@ -20,23 +20,23 @@ import (
"golang.org/x/sync/errgroup"
)
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
s.log.Error(context.Background(), logs.ShardMetabaseFailureSwitchingMode,
func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error {
s.log.Error(ctx, logs.ShardMetabaseFailureSwitchingMode,
zap.String("stage", stage),
zap.Stringer("mode", mode.ReadOnly),
zap.Error(err))
err = s.SetMode(mode.ReadOnly)
err = s.SetMode(ctx, mode.ReadOnly)
if err == nil {
return nil
}
s.log.Error(context.Background(), logs.ShardCantMoveShardToReadonlySwitchMode,
s.log.Error(ctx, logs.ShardCantMoveShardToReadonlySwitchMode,
zap.String("stage", stage),
zap.Stringer("mode", mode.DegradedReadOnly),
zap.Error(err))
err = s.SetMode(mode.DegradedReadOnly)
err = s.SetMode(ctx, mode.DegradedReadOnly)
if err != nil {
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
}
@ -75,7 +75,7 @@ func (s *Shard) Open(ctx context.Context) error {
return fmt.Errorf("could not open %T: %w", components[j], err)
}
}
err = s.handleMetabaseFailure("open", err)
err = s.handleMetabaseFailure(ctx, "open", err)
if err != nil {
return err
}
@ -101,7 +101,7 @@ func (x *metabaseSynchronizer) Init() error {
// Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error {
m := s.GetMode()
if err := s.initializeComponents(m); err != nil {
if err := s.initializeComponents(ctx, m); err != nil {
return err
}
@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error {
return nil
}
func (s *Shard) initializeComponents(m mode.Mode) error {
func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
type initializer interface {
Init() error
}
@ -176,7 +176,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
return fmt.Errorf("metabase initialization: %w", err)
}
err = s.handleMetabaseFailure("init", err)
err = s.handleMetabaseFailure(ctx, "init", err)
if err != nil {
return err
}
@ -364,9 +364,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
}
// Close releases all Shard's components.
func (s *Shard) Close() error {
func (s *Shard) Close(ctx context.Context) error {
if s.rb != nil {
s.rb.Stop(s.log)
s.rb.Stop(ctx, s.log)
}
var components []interface{ Close() error }
@ -386,7 +386,7 @@ func (s *Shard) Close() error {
for _, component := range components {
if err := component.Close(); err != nil {
lastErr = err
s.log.Error(context.Background(), logs.ShardCouldNotCloseShardComponent, zap.Error(err))
s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
}
}
@ -414,18 +414,18 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
unlock := s.lockExclusive()
defer unlock()
s.rb.Stop(s.log)
s.rb.Stop(ctx, s.log)
if !s.info.Mode.NoMetabase() {
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}()
}
ok, err := s.metaBase.Reload(c.metaOpts...)
ok, err := s.metaBase.Reload(ctx, c.metaOpts...)
if err != nil {
if errors.Is(err, meta.ErrDegradedMode) {
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly)
_ = s.setMode(ctx, mode.DegradedReadOnly)
}
return err
}
@ -441,11 +441,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
}
if err != nil {
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly)
_ = s.setMode(ctx, mode.DegradedReadOnly)
return err
}
}
return s.setMode(c.info.Mode)
return s.setMode(ctx, c.info.Mode)
}
func (s *Shard) lockExclusive() func() {

View file

@ -86,7 +86,7 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadWrite, sh.GetMode())
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
// Metabase can be opened in read-only => start in ReadOnly mode.
allowedMode.Store(int64(os.O_RDONLY))
@ -95,9 +95,9 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadOnly, sh.GetMode())
require.Error(t, sh.SetMode(mode.ReadWrite))
require.Error(t, sh.SetMode(context.Background(), mode.ReadWrite))
require.Equal(t, mode.ReadOnly, sh.GetMode())
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
// Metabase is corrupted => start in DegradedReadOnly mode.
allowedMode.Store(math.MaxInt64)
@ -106,7 +106,7 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
}
func TestRefillMetabaseCorrupted(t *testing.T) {
@ -146,7 +146,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err)
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
addr := object.AddressOf(obj)
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
@ -170,7 +170,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
getPrm.SetAddress(addr)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
}
func TestRefillMetabase(t *testing.T) {
@ -358,7 +358,7 @@ func TestRefillMetabase(t *testing.T) {
phyBefore := c.Phy
logicalBefore := c.Logic
err = sh.Close()
err = sh.Close(context.Background())
require.NoError(t, err)
sh = New(
@ -379,7 +379,7 @@ func TestRefillMetabase(t *testing.T) {
// initialize Blobstor
require.NoError(t, sh.Init(context.Background()))
defer sh.Close()
defer sh.Close(context.Background())
checkAllObjs(false)
checkObj(object.AddressOf(tombObj), nil)

View file

@ -37,7 +37,7 @@ func TestShard_Delete_BigObject(t *testing.T) {
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()

View file

@ -79,7 +79,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
sh = New(opts...)
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)

View file

@ -34,7 +34,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
return util.NewPseudoWorkerPool() // synchronous event processing
})},
})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
@ -131,7 +131,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
return util.NewPseudoWorkerPool() // synchronous event processing
})},
})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
@ -190,7 +190,7 @@ func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool
additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024)
@ -254,7 +254,7 @@ func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024)

View file

@ -30,7 +30,7 @@ func TestShard_Get(t *testing.T) {
func testShardGet(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
var putPrm PutPrm
var getPrm GetPrm

View file

@ -28,7 +28,7 @@ func TestShard_Head(t *testing.T) {
func testShardHead(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
var putPrm PutPrm
var headPrm HeadPrm

View file

@ -1,6 +1,7 @@
package shard
import (
"context"
"errors"
"fmt"
@ -30,11 +31,11 @@ func (s *Shard) ID() *ID {
}
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID() (err error) {
func (s *Shard) UpdateID(ctx context.Context) (err error) {
var idFromMetabase []byte
modeDegraded := s.GetMode().NoMetabase()
if !modeDegraded {
if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil {
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
}
}
@ -62,7 +63,7 @@ func (s *Shard) UpdateID() (err error) {
}
if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil {
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
}
}

View file

@ -27,7 +27,7 @@ func TestShard_Inhume(t *testing.T) {
func testShardInhume(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()

View file

@ -18,14 +18,14 @@ func TestShard_List(t *testing.T) {
t.Run("without write cache", func(t *testing.T) {
t.Parallel()
sh := newShard(t, false)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
testShardList(t, sh)
})
t.Run("with write cache", func(t *testing.T) {
t.Parallel()
shWC := newShard(t, true)
defer func() { require.NoError(t, shWC.Close()) }()
defer func() { require.NoError(t, shWC.Close(context.Background())) }()
testShardList(t, shWC)
})
}

View file

@ -62,7 +62,7 @@ func TestShard_Lock(t *testing.T) {
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)
@ -148,7 +148,7 @@ func TestShard_Lock(t *testing.T) {
func TestShard_IsLocked(t *testing.T) {
sh := newShard(t, false)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr)

View file

@ -201,11 +201,11 @@ func TestCounters(t *testing.T) {
dir := t.TempDir()
sh, mm := shardWithMetrics(t, dir)
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
sh.SetMode(mode.ReadOnly)
sh.SetMode(context.Background(), mode.ReadOnly)
require.Equal(t, mode.ReadOnly, mm.mode)
sh.SetMode(mode.ReadWrite)
sh.SetMode(context.Background(), mode.ReadWrite)
require.Equal(t, mode.ReadWrite, mm.mode)
const objNumber = 10

View file

@ -20,19 +20,21 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
//
// Returns any error encountered that did not allow
// setting shard mode.
func (s *Shard) SetMode(m mode.Mode) error {
func (s *Shard) SetMode(ctx context.Context, m mode.Mode) error {
unlock := s.lockExclusive()
defer unlock()
return s.setMode(m)
return s.setMode(ctx, m)
}
func (s *Shard) setMode(m mode.Mode) error {
s.log.Info(context.Background(), logs.ShardSettingShardMode,
func (s *Shard) setMode(ctx context.Context, m mode.Mode) error {
s.log.Info(ctx, logs.ShardSettingShardMode,
zap.Stringer("old_mode", s.info.Mode),
zap.Stringer("new_mode", m))
components := []interface{ SetMode(mode.Mode) error }{
components := []interface {
SetMode(context.Context, mode.Mode) error
}{
s.metaBase, s.blobStor,
}
@ -60,7 +62,7 @@ func (s *Shard) setMode(m mode.Mode) error {
if !m.Disabled() {
for i := range components {
if err := components[i].SetMode(m); err != nil {
if err := components[i].SetMode(ctx, m); err != nil {
return err
}
}
@ -69,7 +71,7 @@ func (s *Shard) setMode(m mode.Mode) error {
s.info.Mode = m
s.metricsWriter.SetMode(s.info.Mode)
s.log.Info(context.Background(), logs.ShardShardModeSetSuccessfully,
s.log.Info(ctx, logs.ShardShardModeSetSuccessfully,
zap.Stringer("mode", s.info.Mode))
return nil
}

View file

@ -94,7 +94,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
}),
},
})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

View file

@ -125,7 +125,7 @@ func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLi
}
}
func (r *rebuilder) Stop(log *logger.Logger) {
func (r *rebuilder) Stop(ctx context.Context, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
@ -138,7 +138,7 @@ func (r *rebuilder) Stop(log *logger.Logger) {
r.wg.Wait()
r.cancel = nil
r.done = nil
log.Info(context.Background(), logs.BlobstoreRebuildStopped)
log.Info(ctx, logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")

View file

@ -34,7 +34,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
})
defer func() { require.NoError(b, sh.Close()) }()
defer func() { require.NoError(b, sh.Close(context.Background())) }()
var putPrm PutPrm
@ -61,7 +61,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
require.NoError(b, err)
}
require.NoError(b, sh.Close())
require.NoError(b, sh.Close(context.Background()))
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
require.NoError(b, sh.Open(context.Background()))
@ -72,5 +72,5 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
require.NoError(b, sh.Init(context.Background()))
require.NoError(b, sh.Close())
require.NoError(b, sh.Close(context.Background()))
}

View file

@ -59,7 +59,7 @@ func TestShardReload(t *testing.T) {
require.NoError(t, sh.Init(context.Background()))
defer func() {
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
}()
objects := make([]objAddr, 5)

View file

@ -52,10 +52,10 @@ func TestWriteCacheObjectLoss(t *testing.T) {
})
}
require.NoError(t, errG.Wait())
require.NoError(t, sh.Close())
require.NoError(t, sh.Close(context.Background()))
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
defer func() { require.NoError(t, sh.Close()) }()
defer func() { require.NoError(t, sh.Close(context.Background())) }()
var getPrm GetPrm

View file

@ -117,8 +117,8 @@ func runFlushTest[Option any](
defer func() { require.NoError(t, wc.Close()) }()
objects := putObjects(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false, false))
@ -131,11 +131,11 @@ func runFlushTest[Option any](
objects := putObjects(t, wc)
// Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded))
require.Error(t, wc.SetMode(context.Background(), mode.Degraded))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded))
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, wc.SetMode(context.Background(), mode.Degraded))
check(t, mb, bs, objects)
})
@ -149,8 +149,8 @@ func runFlushTest[Option any](
objects := putObjects(t, wc)
f.InjectFn(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false, false))
@ -191,8 +191,8 @@ func newCache[Option any](
require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly))
require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly))
return wc, bs, mb
}

View file

@ -23,8 +23,8 @@ type setModePrm struct {
// SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended.
func (c *cache) SetMode(m mode.Mode) error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
func (c *cache) SetMode(ctx context.Context, m mode.Mode) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.SetMode",
trace.WithAttributes(
attribute.String("mode", m.String()),
))

View file

@ -38,7 +38,7 @@ type Cache interface {
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
Delete(context.Context, oid.Address) error
Put(context.Context, common.PutPrm) (common.PutRes, error)
SetMode(mode.Mode) error
SetMode(context.Context, mode.Mode) error
SetLogger(*logger.Logger)
DumpInfo() Info
Flush(context.Context, bool, bool) error

View file

@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/status"
)
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
func (s *Server) DetachShards(ctx context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
@ -19,7 +19,7 @@ func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsReques
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
if err := s.s.DetachShards(shardIDs); err != nil {
if err := s.s.DetachShards(ctx, shardIDs); err != nil {
if errors.As(err, new(logicerr.Logical)) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

View file

@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/status"
)
func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
func (s *Server) SetShardMode(ctx context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
// verify request
err := s.isValidRequest(req)
if err != nil {
@ -38,7 +38,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
}
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
err = s.s.SetShardMode(shardID, m, req.GetBody().GetResetErrorCounter())
err = s.s.SetShardMode(ctx, shardID, m, req.GetBody().GetResetErrorCounter())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}