[#1437] shard: Fix contextcheck linter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-21 11:56:38 +03:00
parent 22b26ea14e
commit cdb3cea4e7
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
38 changed files with 165 additions and 160 deletions

View file

@ -8,7 +8,7 @@ import (
) )
// SetMode sets the blobstor mode of operation. // SetMode sets the blobstor mode of operation.
func (b *BlobStor) SetMode(m mode.Mode) error { func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
b.modeMtx.Lock() b.modeMtx.Lock()
defer b.modeMtx.Unlock() defer b.modeMtx.Unlock()
@ -22,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
err := b.Close() err := b.Close()
if err == nil { if err == nil {
if err = b.openBlobStor(context.TODO(), m); err == nil { if err = b.openBlobStor(ctx, m); err == nil {
err = b.Init() err = b.Init()
} }
} }

View file

@ -56,7 +56,7 @@ func (e *StorageEngine) open(ctx context.Context) error {
sh := e.shards[res.id] sh := e.shards[res.id]
delete(e.shards, res.id) delete(e.shards, res.id)
err := sh.Close() err := sh.Close(ctx)
if err != nil { if err != nil {
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id), zap.String("id", res.id),
@ -108,7 +108,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
sh := e.shards[res.id] sh := e.shards[res.id]
delete(e.shards, res.id) delete(e.shards, res.id)
err := sh.Close() err := sh.Close(ctx)
if err != nil { if err != nil {
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
zap.String("id", res.id), zap.String("id", res.id),
@ -126,7 +126,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
} }
e.wg.Add(1) e.wg.Add(1)
go e.setModeLoop() go e.setModeLoop(ctx)
return nil return nil
} }
@ -153,7 +153,7 @@ func (e *StorageEngine) Close(ctx context.Context) error {
} }
// closes all shards. Never returns an error, shard errors are logged. // closes all shards. Never returns an error, shard errors are logged.
func (e *StorageEngine) close(releasePools bool) error { func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -164,8 +164,8 @@ func (e *StorageEngine) close(releasePools bool) error {
} }
for id, sh := range e.shards { for id, sh := range e.shards {
if err := sh.Close(); err != nil { if err := sh.Close(ctx); err != nil {
e.log.Debug(context.Background(), logs.EngineCouldNotCloseShard, e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
zap.String("id", id), zap.String("id", id),
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
@ -213,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
return e.open(ctx) return e.open(ctx)
} }
} else if prevErr == nil { // ok -> block } else if prevErr == nil { // ok -> block
return e.close(errors.Is(err, errClosed)) return e.close(ctx, errors.Is(err, errClosed))
} }
// otherwise do nothing // otherwise do nothing
@ -306,7 +306,7 @@ loop:
e.mtx.RUnlock() e.mtx.RUnlock()
e.removeShards(shardsToRemove...) e.removeShards(ctx, shardsToRemove...)
for _, p := range shardsToReload { for _, p := range shardsToReload {
err := p.sh.Reload(ctx, p.opts...) err := p.sh.Reload(ctx, p.opts...)
@ -330,13 +330,13 @@ loop:
err = sh.Init(ctx) err = sh.Init(ctx)
} }
if err != nil { if err != nil {
_ = sh.Close() _ = sh.Close(ctx)
return fmt.Errorf("could not init %s shard: %w", idStr, err) return fmt.Errorf("could not init %s shard: %w", idStr, err)
} }
err = e.addShard(sh) err = e.addShard(sh)
if err != nil { if err != nil {
_ = sh.Close() _ = sh.Close(ctx)
return fmt.Errorf("could not add %s shard: %w", idStr, err) return fmt.Errorf("could not add %s shard: %w", idStr, err)
} }

View file

@ -55,7 +55,7 @@ type setModeRequest struct {
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard. // setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// Instead of creating a worker per single shard we use a single goroutine. // Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) setModeLoop() { func (e *StorageEngine) setModeLoop(ctx context.Context) {
defer e.wg.Done() defer e.wg.Done()
var ( var (
@ -75,7 +75,7 @@ func (e *StorageEngine) setModeLoop() {
if !ok { if !ok {
inProgress[sid] = struct{}{} inProgress[sid] = struct{}{}
go func() { go func() {
e.moveToDegraded(r.sh, r.errorCount, r.isMeta) e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta)
mtx.Lock() mtx.Lock()
delete(inProgress, sid) delete(inProgress, sid)
@ -87,7 +87,7 @@ func (e *StorageEngine) setModeLoop() {
} }
} }
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) { func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) {
sid := sh.ID() sid := sh.ID()
log := e.log.With( log := e.log.With(
zap.Stringer("shard_id", sid), zap.Stringer("shard_id", sid),
@ -97,7 +97,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
if isMeta { if isMeta {
err := sh.SetMode(mode.DegradedReadOnly) err := sh.SetMode(ctx, mode.DegradedReadOnly)
if err == nil { if err == nil {
log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold) log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
return return
@ -106,7 +106,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
zap.Error(err)) zap.Error(err))
} }
err := sh.SetMode(mode.ReadOnly) err := sh.SetMode(ctx, mode.ReadOnly)
if err != nil { if err != nil {
log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err)) log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
return return

View file

@ -158,10 +158,10 @@ func TestErrorReporting(t *testing.T) {
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
} }
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false)) require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, false))
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true)) require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, true))
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close(context.Background())) require.NoError(t, te.ng.Close(context.Background()))
}) })

View file

@ -146,7 +146,7 @@ func TestEvacuateShardObjects(t *testing.T) {
require.Equal(t, uint64(0), res.ObjectsEvacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
}) })
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
@ -237,7 +237,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
evacuateShardID := ids[0].String() evacuateShardID := ids[0].String()
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
@ -260,8 +260,8 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
@ -298,7 +298,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
} }
for i := range ids { for i := range ids {
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly))
} }
var prm EvacuateShardPrm var prm EvacuateShardPrm
@ -327,8 +327,8 @@ func TestEvacuateCancellation(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
@ -357,8 +357,8 @@ func TestEvacuateCancellationByError(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[1:2] prm.ShardID = ids[1:2]
@ -386,8 +386,8 @@ func TestEvacuateSingleProcess(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
blocker := make(chan interface{}) blocker := make(chan interface{})
running := make(chan interface{}) running := make(chan interface{})
@ -429,8 +429,8 @@ func TestEvacuateObjectsAsync(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
blocker := make(chan interface{}) blocker := make(chan interface{})
running := make(chan interface{}) running := make(chan interface{})
@ -515,7 +515,7 @@ func TestEvacuateTreesLocal(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
@ -594,8 +594,8 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
}() }()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
mutex := sync.Mutex{} mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move) evacuatedTreeOps := make(map[string][]*pilorama.Move)
@ -753,7 +753,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
prm.Scope = EvacuateScopeObjects prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
@ -810,7 +810,7 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
prm.RepOneOnly = true prm.RepOneOnly = true
prm.ContainerWorkerCount = 10 prm.ContainerWorkerCount = 10
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
start := time.Now() start := time.Now()
_, err := e.Evacuate(context.Background(), prm) _, err := e.Evacuate(context.Background(), prm)

View file

@ -121,7 +121,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
return sh.ID(), nil return sh.ID(), nil
} }
func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) { func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID() id, err := generateShardID()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err) return nil, fmt.Errorf("could not generate shard ID: %w", err)
@ -139,8 +139,8 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh
shard.WithZeroCountCallback(e.processZeroCountContainers), shard.WithZeroCountCallback(e.processZeroCountContainers),
)...) )...)
if err := sh.UpdateID(); err != nil { if err := sh.UpdateID(ctx); err != nil {
e.log.Warn(context.Background(), logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err)) e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
} }
return sh, nil return sh, nil
@ -203,7 +203,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
// removeShards removes specified shards. Skips non-existent shards. // removeShards removes specified shards. Skips non-existent shards.
// Logs errors about shards that it could not Close after the removal. // Logs errors about shards that it could not Close after the removal.
func (e *StorageEngine) removeShards(ids ...string) { func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
if len(ids) == 0 { if len(ids) == 0 {
return return
} }
@ -228,22 +228,22 @@ func (e *StorageEngine) removeShards(ids ...string) {
delete(e.shardPools, id) delete(e.shardPools, id)
} }
e.log.Info(context.Background(), logs.EngineShardHasBeenRemoved, e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
zap.String("id", id)) zap.String("id", id))
} }
e.mtx.Unlock() e.mtx.Unlock()
for _, sh := range ss { for _, sh := range ss {
err := sh.SetMode(mode.Disabled) err := sh.SetMode(ctx, mode.Disabled)
if err != nil { if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled, e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()), zap.Stringer("id", sh.ID()),
zap.Error(err), zap.Error(err),
) )
} }
err = sh.Close() err = sh.Close(ctx)
if err != nil { if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard, e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()), zap.Stringer("id", sh.ID()),
zap.Error(err), zap.Error(err),
) )
@ -310,7 +310,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto
// SetShardMode sets mode of the shard with provided identifier. // SetShardMode sets mode of the shard with provided identifier.
// //
// Returns an error if shard mode was not set, or shard was not found in storage engine. // Returns an error if shard mode was not set, or shard was not found in storage engine.
func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error { func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -320,7 +320,7 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
sh.errorCount.Store(0) sh.errorCount.Store(0)
e.metrics.ClearErrorCounter(shID) e.metrics.ClearErrorCounter(shID)
} }
return sh.SetMode(m) return sh.SetMode(ctx, m)
} }
} }
@ -346,7 +346,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
} }
} }
func (e *StorageEngine) DetachShards(ids []*shard.ID) error { func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error {
if len(ids) == 0 { if len(ids) == 0 {
return logicerr.New("ids must be non-empty") return logicerr.New("ids must be non-empty")
} }
@ -356,20 +356,20 @@ func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
return err return err
} }
return e.closeShards(deletedShards) return e.closeShards(ctx, deletedShards)
} }
// closeShards closes deleted shards. Tries to close all shards. // closeShards closes deleted shards. Tries to close all shards.
// Returns single error with joined shard errors. // Returns single error with joined shard errors.
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error { func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error {
var multiErr error var multiErr error
var multiErrGuard sync.Mutex var multiErrGuard sync.Mutex
var eg errgroup.Group var eg errgroup.Group
for _, sh := range deletedShards { for _, sh := range deletedShards {
eg.Go(func() error { eg.Go(func() error {
err := sh.SetMode(mode.Disabled) err := sh.SetMode(ctx, mode.Disabled)
if err != nil { if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled, e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
zap.Stringer("id", sh.ID()), zap.Stringer("id", sh.ID()),
zap.Error(err), zap.Error(err),
) )
@ -378,9 +378,9 @@ func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
multiErrGuard.Unlock() multiErrGuard.Unlock()
} }
err = sh.Close() err = sh.Close(ctx)
if err != nil { if err != nil {
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard, e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
zap.Stringer("id", sh.ID()), zap.Stringer("id", sh.ID()),
zap.Error(err), zap.Error(err),
) )

View file

@ -33,7 +33,7 @@ func TestRemoveShard(t *testing.T) {
for id, remove := range mSh { for id, remove := range mSh {
if remove { if remove {
e.removeShards(id) e.removeShards(context.Background(), id)
} }
} }
@ -55,11 +55,11 @@ func TestDisableShards(t *testing.T) {
e, ids := te.engine, te.shardIDs e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }() defer func() { require.NoError(t, e.Close(context.Background())) }()
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical)) require.ErrorAs(t, e.DetachShards(context.Background(), ids), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical)) require.ErrorAs(t, e.DetachShards(context.Background(), nil), new(logicerr.Logical))
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical)) require.ErrorAs(t, e.DetachShards(context.Background(), []*shard.ID{}), new(logicerr.Logical))
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]})) require.NoError(t, e.DetachShards(context.Background(), []*shard.ID{ids[0]}))
require.Equal(t, 1, len(e.shards)) require.Equal(t, 1, len(e.shards))
} }

View file

@ -11,7 +11,7 @@ import (
// Component represents single storage component. // Component represents single storage component.
type Component interface { type Component interface {
Open(context.Context, mode.Mode) error Open(context.Context, mode.Mode) error
SetMode(mode.Mode) error SetMode(context.Context, mode.Mode) error
Init() error Init() error
Close() error Close() error
} }
@ -91,12 +91,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
// call `SetMode` on all not-yet-initialized components. // call `SetMode` on all not-yet-initialized components.
s := cons(t) s := cons(t)
require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(context.Background(), m))
t.Run("after open in RO", func(t *testing.T) { t.Run("after open in RO", func(t *testing.T) {
require.NoError(t, s.Close()) require.NoError(t, s.Close())
require.NoError(t, s.Open(context.Background(), mode.ReadOnly)) require.NoError(t, s.Open(context.Background(), mode.ReadOnly))
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(context.Background(), m))
}) })
require.NoError(t, s.Close()) require.NoError(t, s.Close())
@ -106,7 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
// Use-case: notmal node operation. // Use-case: notmal node operation.
require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.SetMode(m)) require.NoError(t, s.SetMode(context.Background(), m))
require.NoError(t, s.Close()) require.NoError(t, s.Close())
}) })
} }
@ -116,7 +116,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
s := cons(t) s := cons(t)
require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
require.NoError(t, s.Init()) require.NoError(t, s.Init())
require.NoError(t, s.SetMode(from)) require.NoError(t, s.SetMode(context.Background(), from))
require.NoError(t, s.SetMode(to)) require.NoError(t, s.SetMode(context.Background(), to))
require.NoError(t, s.Close()) require.NoError(t, s.Close())
} }

View file

@ -39,7 +39,7 @@ var (
) )
// Open boltDB instance for metabase. // Open boltDB instance for metabase.
func (db *DB) Open(_ context.Context, m mode.Mode) error { func (db *DB) Open(ctx context.Context, m mode.Mode) error {
db.modeMtx.Lock() db.modeMtx.Lock()
defer db.modeMtx.Unlock() defer db.modeMtx.Unlock()
db.mode = m db.mode = m
@ -48,10 +48,10 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
if m.NoMetabase() { if m.NoMetabase() {
return nil return nil
} }
return db.openDB(m) return db.openDB(ctx, m)
} }
func (db *DB) openDB(mode mode.Mode) error { func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
if err != nil { if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
@ -65,10 +65,10 @@ func (db *DB) openDB(mode mode.Mode) error {
} }
db.boltOptions.ReadOnly = mode.ReadOnly() db.boltOptions.ReadOnly = mode.ReadOnly()
return metaerr.Wrap(db.openBolt()) return metaerr.Wrap(db.openBolt(ctx))
} }
func (db *DB) openBolt() error { func (db *DB) openBolt(ctx context.Context) error {
var err error var err error
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
@ -226,7 +226,7 @@ func (db *DB) close() error {
// If there was a problem with applying new configuration, an error is returned. // If there was a problem with applying new configuration, an error is returned.
// //
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned. // If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
func (db *DB) Reload(opts ...Option) (bool, error) { func (db *DB) Reload(ctx context.Context, opts ...Option) (bool, error) {
var c cfg var c cfg
for i := range opts { for i := range opts {
opts[i](&c) opts[i](&c)
@ -243,7 +243,7 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
db.mode = mode.Disabled db.mode = mode.Disabled
db.metrics.SetMode(mode.ComponentDisabled) db.metrics.SetMode(mode.ComponentDisabled)
db.info.Path = c.info.Path db.info.Path = c.info.Path
if err := db.openBolt(); err != nil { if err := db.openBolt(ctx); err != nil {
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err)) return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
} }

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -8,7 +9,7 @@ import (
// SetMode sets the metabase mode of operation. // SetMode sets the metabase mode of operation.
// If the mode assumes no operation metabase, the database is closed. // If the mode assumes no operation metabase, the database is closed.
func (db *DB) SetMode(m mode.Mode) error { func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
db.modeMtx.Lock() db.modeMtx.Lock()
defer db.modeMtx.Unlock() defer db.modeMtx.Unlock()
@ -25,7 +26,7 @@ func (db *DB) SetMode(m mode.Mode) error {
if m.NoMetabase() { if m.NoMetabase() {
db.boltDB = nil db.boltDB = nil
} else { } else {
err := db.openDB(m) err := db.openDB(ctx, m)
if err == nil && !m.ReadOnly() { if err == nil && !m.ReadOnly() {
err = db.Init() err = db.Init()
} }

View file

@ -2,6 +2,7 @@ package meta
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -21,7 +22,7 @@ var (
// If id is missing, returns nil, nil. // If id is missing, returns nil, nil.
// //
// GetShardID does not report any metrics. // GetShardID does not report any metrics.
func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) { func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error) {
db.modeMtx.Lock() db.modeMtx.Lock()
defer db.modeMtx.Unlock() defer db.modeMtx.Unlock()
db.mode = mode db.mode = mode
@ -30,7 +31,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
return nil, nil return nil, nil
} }
if err := db.openDB(mode); err != nil { if err := db.openDB(ctx, mode); err != nil {
return nil, fmt.Errorf("failed to open metabase: %w", err) return nil, fmt.Errorf("failed to open metabase: %w", err)
} }
@ -59,7 +60,7 @@ func (db *DB) readShardID() ([]byte, error) {
// SetShardID sets metabase operation mode // SetShardID sets metabase operation mode
// and writes shard id to db. // and writes shard id to db.
func (db *DB) SetShardID(id []byte, mode metamode.Mode) error { func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) error {
db.modeMtx.Lock() db.modeMtx.Lock()
defer db.modeMtx.Unlock() defer db.modeMtx.Unlock()
db.mode = mode db.mode = mode
@ -68,7 +69,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
return ErrReadOnlyMode return ErrReadOnlyMode
} }
if err := db.openDB(mode); err != nil { if err := db.openDB(ctx, mode); err != nil {
return fmt.Errorf("failed to open metabase: %w", err) return fmt.Errorf("failed to open metabase: %w", err)
} }

View file

@ -58,7 +58,7 @@ func TestVersion(t *testing.T) {
}) })
t.Run("old data", func(t *testing.T) { t.Run("old data", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite)) require.NoError(t, db.SetShardID(context.Background(), []byte{1, 2, 3, 4}, mode.ReadWrite))
require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init()) require.NoError(t, db.Init())

View file

@ -91,7 +91,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
return &b return &b
} }
func (t *boltForest) SetMode(m mode.Mode) error { func (t *boltForest) SetMode(_ context.Context, m mode.Mode) error {
t.modeMtx.Lock() t.modeMtx.Lock()
defer t.modeMtx.Unlock() defer t.modeMtx.Unlock()

View file

@ -128,7 +128,7 @@ func (f *memoryForest) Open(context.Context, mode.Mode) error {
return nil return nil
} }
func (f *memoryForest) SetMode(mode.Mode) error { func (f *memoryForest) SetMode(context.Context, mode.Mode) error {
return nil return nil
} }

View file

@ -65,7 +65,7 @@ type ForestStorage interface {
Init() error Init() error
Open(context.Context, mode.Mode) error Open(context.Context, mode.Mode) error
Close() error Close() error
SetMode(m mode.Mode) error SetMode(context.Context, mode.Mode) error
SetParentID(id string) SetParentID(id string)
Forest Forest

View file

@ -20,23 +20,23 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
func (s *Shard) handleMetabaseFailure(stage string, err error) error { func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error {
s.log.Error(context.Background(), logs.ShardMetabaseFailureSwitchingMode, s.log.Error(ctx, logs.ShardMetabaseFailureSwitchingMode,
zap.String("stage", stage), zap.String("stage", stage),
zap.Stringer("mode", mode.ReadOnly), zap.Stringer("mode", mode.ReadOnly),
zap.Error(err)) zap.Error(err))
err = s.SetMode(mode.ReadOnly) err = s.SetMode(ctx, mode.ReadOnly)
if err == nil { if err == nil {
return nil return nil
} }
s.log.Error(context.Background(), logs.ShardCantMoveShardToReadonlySwitchMode, s.log.Error(ctx, logs.ShardCantMoveShardToReadonlySwitchMode,
zap.String("stage", stage), zap.String("stage", stage),
zap.Stringer("mode", mode.DegradedReadOnly), zap.Stringer("mode", mode.DegradedReadOnly),
zap.Error(err)) zap.Error(err))
err = s.SetMode(mode.DegradedReadOnly) err = s.SetMode(ctx, mode.DegradedReadOnly)
if err != nil { if err != nil {
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly)) return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
} }
@ -75,7 +75,7 @@ func (s *Shard) Open(ctx context.Context) error {
return fmt.Errorf("could not open %T: %w", components[j], err) return fmt.Errorf("could not open %T: %w", components[j], err)
} }
} }
err = s.handleMetabaseFailure("open", err) err = s.handleMetabaseFailure(ctx, "open", err)
if err != nil { if err != nil {
return err return err
} }
@ -101,7 +101,7 @@ func (x *metabaseSynchronizer) Init() error {
// Init initializes all Shard's components. // Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error { func (s *Shard) Init(ctx context.Context) error {
m := s.GetMode() m := s.GetMode()
if err := s.initializeComponents(m); err != nil { if err := s.initializeComponents(ctx, m); err != nil {
return err return err
} }
@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error {
return nil return nil
} }
func (s *Shard) initializeComponents(m mode.Mode) error { func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
type initializer interface { type initializer interface {
Init() error Init() error
} }
@ -176,7 +176,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
return fmt.Errorf("metabase initialization: %w", err) return fmt.Errorf("metabase initialization: %w", err)
} }
err = s.handleMetabaseFailure("init", err) err = s.handleMetabaseFailure(ctx, "init", err)
if err != nil { if err != nil {
return err return err
} }
@ -364,9 +364,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
} }
// Close releases all Shard's components. // Close releases all Shard's components.
func (s *Shard) Close() error { func (s *Shard) Close(ctx context.Context) error {
if s.rb != nil { if s.rb != nil {
s.rb.Stop(s.log) s.rb.Stop(ctx, s.log)
} }
var components []interface{ Close() error } var components []interface{ Close() error }
@ -386,7 +386,7 @@ func (s *Shard) Close() error {
for _, component := range components { for _, component := range components {
if err := component.Close(); err != nil { if err := component.Close(); err != nil {
lastErr = err lastErr = err
s.log.Error(context.Background(), logs.ShardCouldNotCloseShardComponent, zap.Error(err)) s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
} }
} }
@ -414,18 +414,18 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
unlock := s.lockExclusive() unlock := s.lockExclusive()
defer unlock() defer unlock()
s.rb.Stop(s.log) s.rb.Stop(ctx, s.log)
if !s.info.Mode.NoMetabase() { if !s.info.Mode.NoMetabase() {
defer func() { defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}() }()
} }
ok, err := s.metaBase.Reload(c.metaOpts...) ok, err := s.metaBase.Reload(ctx, c.metaOpts...)
if err != nil { if err != nil {
if errors.Is(err, meta.ErrDegradedMode) { if errors.Is(err, meta.ErrDegradedMode) {
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err)) s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly) _ = s.setMode(ctx, mode.DegradedReadOnly)
} }
return err return err
} }
@ -441,11 +441,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
} }
if err != nil { if err != nil {
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err)) s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly) _ = s.setMode(ctx, mode.DegradedReadOnly)
return err return err
} }
} }
return s.setMode(c.info.Mode) return s.setMode(ctx, c.info.Mode)
} }
func (s *Shard) lockExclusive() func() { func (s *Shard) lockExclusive() func() {

View file

@ -86,7 +86,7 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadWrite, sh.GetMode()) require.Equal(t, mode.ReadWrite, sh.GetMode())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
// Metabase can be opened in read-only => start in ReadOnly mode. // Metabase can be opened in read-only => start in ReadOnly mode.
allowedMode.Store(int64(os.O_RDONLY)) allowedMode.Store(int64(os.O_RDONLY))
@ -95,9 +95,9 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.ReadOnly, sh.GetMode()) require.Equal(t, mode.ReadOnly, sh.GetMode())
require.Error(t, sh.SetMode(mode.ReadWrite)) require.Error(t, sh.SetMode(context.Background(), mode.ReadWrite))
require.Equal(t, mode.ReadOnly, sh.GetMode()) require.Equal(t, mode.ReadOnly, sh.GetMode())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
// Metabase is corrupted => start in DegradedReadOnly mode. // Metabase is corrupted => start in DegradedReadOnly mode.
allowedMode.Store(math.MaxInt64) allowedMode.Store(math.MaxInt64)
@ -106,7 +106,7 @@ func TestShardOpen(t *testing.T) {
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
} }
func TestRefillMetabaseCorrupted(t *testing.T) { func TestRefillMetabaseCorrupted(t *testing.T) {
@ -146,7 +146,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm) _, err := sh.Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
addr := object.AddressOf(obj) addr := object.AddressOf(obj)
// This is copied from `fstree.treePath()` to avoid exporting function just for tests. // This is copied from `fstree.treePath()` to avoid exporting function just for tests.
@ -170,7 +170,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
_, err = sh.Get(context.Background(), getPrm) _, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err)) require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
} }
func TestRefillMetabase(t *testing.T) { func TestRefillMetabase(t *testing.T) {
@ -358,7 +358,7 @@ func TestRefillMetabase(t *testing.T) {
phyBefore := c.Phy phyBefore := c.Phy
logicalBefore := c.Logic logicalBefore := c.Logic
err = sh.Close() err = sh.Close(context.Background())
require.NoError(t, err) require.NoError(t, err)
sh = New( sh = New(
@ -379,7 +379,7 @@ func TestRefillMetabase(t *testing.T) {
// initialize Blobstor // initialize Blobstor
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
defer sh.Close() defer sh.Close(context.Background())
checkAllObjs(false) checkAllObjs(false)
checkObj(object.AddressOf(tombObj), nil) checkObj(object.AddressOf(tombObj), nil)

View file

@ -37,7 +37,7 @@ func TestShard_Delete_BigObject(t *testing.T) {
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) { func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()

View file

@ -79,7 +79,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
sh = New(opts...) sh = New(opts...)
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)

View file

@ -34,7 +34,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
return util.NewPseudoWorkerPool() // synchronous event processing return util.NewPseudoWorkerPool() // synchronous event processing
})}, })},
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()
@ -131,7 +131,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
return util.NewPseudoWorkerPool() // synchronous event processing return util.NewPseudoWorkerPool() // synchronous event processing
})}, })},
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
lock := testutil.GenerateObjectWithCID(cnr) lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock) lock.SetType(objectSDK.TypeLock)
@ -190,7 +190,7 @@ func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool
additionalShardOptions: []Option{WithDisabledGC()}, additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024) obj := testutil.GenerateObjectWithSize(1024)
@ -254,7 +254,7 @@ func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
additionalShardOptions: []Option{WithDisabledGC()}, additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
obj := testutil.GenerateObjectWithSize(1024) obj := testutil.GenerateObjectWithSize(1024)

View file

@ -30,7 +30,7 @@ func TestShard_Get(t *testing.T) {
func testShardGet(t *testing.T, hasWriteCache bool) { func testShardGet(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
var putPrm PutPrm var putPrm PutPrm
var getPrm GetPrm var getPrm GetPrm

View file

@ -28,7 +28,7 @@ func TestShard_Head(t *testing.T) {
func testShardHead(t *testing.T, hasWriteCache bool) { func testShardHead(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
var putPrm PutPrm var putPrm PutPrm
var headPrm HeadPrm var headPrm HeadPrm

View file

@ -1,6 +1,7 @@
package shard package shard
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
@ -30,11 +31,11 @@ func (s *Shard) ID() *ID {
} }
// UpdateID reads shard ID saved in the metabase and updates it if it is missing. // UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID() (err error) { func (s *Shard) UpdateID(ctx context.Context) (err error) {
var idFromMetabase []byte var idFromMetabase []byte
modeDegraded := s.GetMode().NoMetabase() modeDegraded := s.GetMode().NoMetabase()
if !modeDegraded { if !modeDegraded {
if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil { if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
err = fmt.Errorf("failed to read shard id from metabase: %w", err) err = fmt.Errorf("failed to read shard id from metabase: %w", err)
} }
} }
@ -62,7 +63,7 @@ func (s *Shard) UpdateID() (err error) {
} }
if len(idFromMetabase) == 0 && !modeDegraded { if len(idFromMetabase) == 0 && !modeDegraded {
if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil { if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr)) err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
} }
} }

View file

@ -27,7 +27,7 @@ func TestShard_Inhume(t *testing.T) {
func testShardInhume(t *testing.T, hasWriteCache bool) { func testShardInhume(t *testing.T, hasWriteCache bool) {
sh := newShard(t, hasWriteCache) sh := newShard(t, hasWriteCache)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()

View file

@ -18,14 +18,14 @@ func TestShard_List(t *testing.T) {
t.Run("without write cache", func(t *testing.T) { t.Run("without write cache", func(t *testing.T) {
t.Parallel() t.Parallel()
sh := newShard(t, false) sh := newShard(t, false)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
testShardList(t, sh) testShardList(t, sh)
}) })
t.Run("with write cache", func(t *testing.T) { t.Run("with write cache", func(t *testing.T) {
t.Parallel() t.Parallel()
shWC := newShard(t, true) shWC := newShard(t, true)
defer func() { require.NoError(t, shWC.Close()) }() defer func() { require.NoError(t, shWC.Close(context.Background())) }()
testShardList(t, shWC) testShardList(t, shWC)
}) })
} }

View file

@ -62,7 +62,7 @@ func TestShard_Lock(t *testing.T) {
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)
@ -148,7 +148,7 @@ func TestShard_Lock(t *testing.T) {
func TestShard_IsLocked(t *testing.T) { func TestShard_IsLocked(t *testing.T) {
sh := newShard(t, false) sh := newShard(t, false)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
cnr := cidtest.ID() cnr := cidtest.ID()
obj := testutil.GenerateObjectWithCID(cnr) obj := testutil.GenerateObjectWithCID(cnr)

View file

@ -201,11 +201,11 @@ func TestCounters(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
sh, mm := shardWithMetrics(t, dir) sh, mm := shardWithMetrics(t, dir)
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
sh.SetMode(mode.ReadOnly) sh.SetMode(context.Background(), mode.ReadOnly)
require.Equal(t, mode.ReadOnly, mm.mode) require.Equal(t, mode.ReadOnly, mm.mode)
sh.SetMode(mode.ReadWrite) sh.SetMode(context.Background(), mode.ReadWrite)
require.Equal(t, mode.ReadWrite, mm.mode) require.Equal(t, mode.ReadWrite, mm.mode)
const objNumber = 10 const objNumber = 10

View file

@ -20,19 +20,21 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
// //
// Returns any error encountered that did not allow // Returns any error encountered that did not allow
// setting shard mode. // setting shard mode.
func (s *Shard) SetMode(m mode.Mode) error { func (s *Shard) SetMode(ctx context.Context, m mode.Mode) error {
unlock := s.lockExclusive() unlock := s.lockExclusive()
defer unlock() defer unlock()
return s.setMode(m) return s.setMode(ctx, m)
} }
func (s *Shard) setMode(m mode.Mode) error { func (s *Shard) setMode(ctx context.Context, m mode.Mode) error {
s.log.Info(context.Background(), logs.ShardSettingShardMode, s.log.Info(ctx, logs.ShardSettingShardMode,
zap.Stringer("old_mode", s.info.Mode), zap.Stringer("old_mode", s.info.Mode),
zap.Stringer("new_mode", m)) zap.Stringer("new_mode", m))
components := []interface{ SetMode(mode.Mode) error }{ components := []interface {
SetMode(context.Context, mode.Mode) error
}{
s.metaBase, s.blobStor, s.metaBase, s.blobStor,
} }
@ -60,7 +62,7 @@ func (s *Shard) setMode(m mode.Mode) error {
if !m.Disabled() { if !m.Disabled() {
for i := range components { for i := range components {
if err := components[i].SetMode(m); err != nil { if err := components[i].SetMode(ctx, m); err != nil {
return err return err
} }
} }
@ -69,7 +71,7 @@ func (s *Shard) setMode(m mode.Mode) error {
s.info.Mode = m s.info.Mode = m
s.metricsWriter.SetMode(s.info.Mode) s.metricsWriter.SetMode(s.info.Mode)
s.log.Info(context.Background(), logs.ShardShardModeSetSuccessfully, s.log.Info(ctx, logs.ShardShardModeSetSuccessfully,
zap.Stringer("mode", s.info.Mode)) zap.Stringer("mode", s.info.Mode))
return nil return nil
} }

View file

@ -94,7 +94,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
}), }),
}, },
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {

View file

@ -125,7 +125,7 @@ func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLi
} }
} }
func (r *rebuilder) Stop(log *logger.Logger) { func (r *rebuilder) Stop(ctx context.Context, log *logger.Logger) {
r.mtx.Lock() r.mtx.Lock()
defer r.mtx.Unlock() defer r.mtx.Unlock()
@ -138,7 +138,7 @@ func (r *rebuilder) Stop(log *logger.Logger) {
r.wg.Wait() r.wg.Wait()
r.cancel = nil r.cancel = nil
r.done = nil r.done = nil
log.Info(context.Background(), logs.BlobstoreRebuildStopped) log.Info(ctx, logs.BlobstoreRebuildStopped)
} }
var errMBIsNotAvailable = errors.New("metabase is not available") var errMBIsNotAvailable = errors.New("metabase is not available")

View file

@ -34,7 +34,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)}, additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
}) })
defer func() { require.NoError(b, sh.Close()) }() defer func() { require.NoError(b, sh.Close(context.Background())) }()
var putPrm PutPrm var putPrm PutPrm
@ -61,7 +61,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
require.NoError(b, err) require.NoError(b, err)
} }
require.NoError(b, sh.Close()) require.NoError(b, sh.Close(context.Background()))
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path)) require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
require.NoError(b, sh.Open(context.Background())) require.NoError(b, sh.Open(context.Background()))
@ -72,5 +72,5 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
require.NoError(b, sh.Init(context.Background())) require.NoError(b, sh.Init(context.Background()))
require.NoError(b, sh.Close()) require.NoError(b, sh.Close(context.Background()))
} }

View file

@ -59,7 +59,7 @@ func TestShardReload(t *testing.T) {
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
defer func() { defer func() {
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
}() }()
objects := make([]objAddr, 5) objects := make([]objAddr, 5)

View file

@ -52,10 +52,10 @@ func TestWriteCacheObjectLoss(t *testing.T) {
}) })
} }
require.NoError(t, errG.Wait()) require.NoError(t, errG.Wait())
require.NoError(t, sh.Close()) require.NoError(t, sh.Close(context.Background()))
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts}) sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close(context.Background())) }()
var getPrm GetPrm var getPrm GetPrm

View file

@ -117,8 +117,8 @@ func runFlushTest[Option any](
defer func() { require.NoError(t, wc.Close()) }() defer func() { require.NoError(t, wc.Close()) }()
objects := putObjects(t, wc) objects := putObjects(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false, false)) require.NoError(t, wc.Flush(context.Background(), false, false))
@ -131,11 +131,11 @@ func runFlushTest[Option any](
objects := putObjects(t, wc) objects := putObjects(t, wc)
// Blobstor is read-only, so we expect en error from `flush` here. // Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded)) require.Error(t, wc.SetMode(context.Background(), mode.Degraded))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded)) require.NoError(t, wc.SetMode(context.Background(), mode.Degraded))
check(t, mb, bs, objects) check(t, mb, bs, objects)
}) })
@ -149,8 +149,8 @@ func runFlushTest[Option any](
objects := putObjects(t, wc) objects := putObjects(t, wc)
f.InjectFn(t, wc) f.InjectFn(t, wc)
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load()) require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false, false)) require.Error(t, wc.Flush(context.Background(), false, false))
@ -191,8 +191,8 @@ func newCache[Option any](
require.NoError(t, wc.Init()) require.NoError(t, wc.Init())
// First set mode for metabase and blobstor to prevent background flushes. // First set mode for metabase and blobstor to prevent background flushes.
require.NoError(t, mb.SetMode(mode.ReadOnly)) require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly))
return wc, bs, mb return wc, bs, mb
} }

View file

@ -23,8 +23,8 @@ type setModePrm struct {
// SetMode sets write-cache mode of operation. // SetMode sets write-cache mode of operation.
// When shard is put in read-only mode all objects in memory are flushed to disk // When shard is put in read-only mode all objects in memory are flushed to disk
// and all background jobs are suspended. // and all background jobs are suspended.
func (c *cache) SetMode(m mode.Mode) error { func (c *cache) SetMode(ctx context.Context, m mode.Mode) error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.SetMode",
trace.WithAttributes( trace.WithAttributes(
attribute.String("mode", m.String()), attribute.String("mode", m.String()),
)) ))

View file

@ -38,7 +38,7 @@ type Cache interface {
// Returns ErrReadOnly if the Cache is currently in the read-only mode. // Returns ErrReadOnly if the Cache is currently in the read-only mode.
Delete(context.Context, oid.Address) error Delete(context.Context, oid.Address) error
Put(context.Context, common.PutPrm) (common.PutRes, error) Put(context.Context, common.PutPrm) (common.PutRes, error)
SetMode(mode.Mode) error SetMode(context.Context, mode.Mode) error
SetLogger(*logger.Logger) SetLogger(*logger.Logger)
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool, bool) error Flush(context.Context, bool, bool) error

View file

@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) { func (s *Server) DetachShards(ctx context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
err := s.isValidRequest(req) err := s.isValidRequest(req)
if err != nil { if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error()) return nil, status.Error(codes.PermissionDenied, err.Error())
@ -19,7 +19,7 @@ func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsReques
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID()) shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
if err := s.s.DetachShards(shardIDs); err != nil { if err := s.s.DetachShards(ctx, shardIDs); err != nil {
if errors.As(err, new(logicerr.Logical)) { if errors.As(err, new(logicerr.Logical)) {
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }

View file

@ -11,7 +11,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) { func (s *Server) SetShardMode(ctx context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
// verify request // verify request
err := s.isValidRequest(req) err := s.isValidRequest(req)
if err != nil { if err != nil {
@ -38,7 +38,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
} }
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
err = s.s.SetShardMode(shardID, m, req.GetBody().GetResetErrorCounter()) err = s.s.SetShardMode(ctx, shardID, m, req.GetBody().GetResetErrorCounter())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }