[#1437] shard: Fix contextcheck linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c139892117
commit
16598553d9
38 changed files with 165 additions and 160 deletions
|
@ -8,7 +8,7 @@ import (
|
|||
)
|
||||
|
||||
// SetMode sets the blobstor mode of operation.
|
||||
func (b *BlobStor) SetMode(m mode.Mode) error {
|
||||
func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
|
||||
b.modeMtx.Lock()
|
||||
defer b.modeMtx.Unlock()
|
||||
|
||||
|
@ -22,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
|
|||
|
||||
err := b.Close()
|
||||
if err == nil {
|
||||
if err = b.openBlobStor(context.TODO(), m); err == nil {
|
||||
if err = b.openBlobStor(ctx, m); err == nil {
|
||||
err = b.Init()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ func (e *StorageEngine) open(ctx context.Context) error {
|
|||
sh := e.shards[res.id]
|
||||
delete(e.shards, res.id)
|
||||
|
||||
err := sh.Close()
|
||||
err := sh.Close(ctx)
|
||||
if err != nil {
|
||||
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
||||
zap.String("id", res.id),
|
||||
|
@ -108,7 +108,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
|||
sh := e.shards[res.id]
|
||||
delete(e.shards, res.id)
|
||||
|
||||
err := sh.Close()
|
||||
err := sh.Close(ctx)
|
||||
if err != nil {
|
||||
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
||||
zap.String("id", res.id),
|
||||
|
@ -126,7 +126,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
|||
}
|
||||
|
||||
e.wg.Add(1)
|
||||
go e.setModeLoop()
|
||||
go e.setModeLoop(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ func (e *StorageEngine) Close(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// closes all shards. Never returns an error, shard errors are logged.
|
||||
func (e *StorageEngine) close(releasePools bool) error {
|
||||
func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
@ -164,8 +164,8 @@ func (e *StorageEngine) close(releasePools bool) error {
|
|||
}
|
||||
|
||||
for id, sh := range e.shards {
|
||||
if err := sh.Close(); err != nil {
|
||||
e.log.Debug(context.Background(), logs.EngineCouldNotCloseShard,
|
||||
if err := sh.Close(ctx); err != nil {
|
||||
e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
|
||||
zap.String("id", id),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
@ -213,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
|||
return e.open(ctx)
|
||||
}
|
||||
} else if prevErr == nil { // ok -> block
|
||||
return e.close(errors.Is(err, errClosed))
|
||||
return e.close(ctx, errors.Is(err, errClosed))
|
||||
}
|
||||
|
||||
// otherwise do nothing
|
||||
|
@ -306,7 +306,7 @@ loop:
|
|||
|
||||
e.mtx.RUnlock()
|
||||
|
||||
e.removeShards(shardsToRemove...)
|
||||
e.removeShards(ctx, shardsToRemove...)
|
||||
|
||||
for _, p := range shardsToReload {
|
||||
err := p.sh.Reload(ctx, p.opts...)
|
||||
|
@ -330,13 +330,13 @@ loop:
|
|||
err = sh.Init(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
_ = sh.Close()
|
||||
_ = sh.Close(ctx)
|
||||
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
||||
}
|
||||
|
||||
err = e.addShard(sh)
|
||||
if err != nil {
|
||||
_ = sh.Close()
|
||||
_ = sh.Close(ctx)
|
||||
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ type setModeRequest struct {
|
|||
|
||||
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
|
||||
// Instead of creating a worker per single shard we use a single goroutine.
|
||||
func (e *StorageEngine) setModeLoop() {
|
||||
func (e *StorageEngine) setModeLoop(ctx context.Context) {
|
||||
defer e.wg.Done()
|
||||
|
||||
var (
|
||||
|
@ -75,7 +75,7 @@ func (e *StorageEngine) setModeLoop() {
|
|||
if !ok {
|
||||
inProgress[sid] = struct{}{}
|
||||
go func() {
|
||||
e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
|
||||
e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta)
|
||||
|
||||
mtx.Lock()
|
||||
delete(inProgress, sid)
|
||||
|
@ -87,7 +87,7 @@ func (e *StorageEngine) setModeLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
|
||||
func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) {
|
||||
sid := sh.ID()
|
||||
log := e.log.With(
|
||||
zap.Stringer("shard_id", sid),
|
||||
|
@ -97,7 +97,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
|
|||
defer e.mtx.RUnlock()
|
||||
|
||||
if isMeta {
|
||||
err := sh.SetMode(mode.DegradedReadOnly)
|
||||
err := sh.SetMode(ctx, mode.DegradedReadOnly)
|
||||
if err == nil {
|
||||
log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
|
||||
return
|
||||
|
@ -106,7 +106,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
|
|||
zap.Error(err))
|
||||
}
|
||||
|
||||
err := sh.SetMode(mode.ReadOnly)
|
||||
err := sh.SetMode(ctx, mode.ReadOnly)
|
||||
if err != nil {
|
||||
log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
|
||||
return
|
||||
|
|
|
@ -158,10 +158,10 @@ func TestErrorReporting(t *testing.T) {
|
|||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
|
||||
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, false))
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
||||
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
|
||||
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, true))
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
})
|
||||
|
|
|
@ -146,7 +146,7 @@ func TestEvacuateShardObjects(t *testing.T) {
|
|||
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
||||
})
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
@ -237,7 +237,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
|
||||
evacuateShardID := ids[0].String()
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[0:1]
|
||||
|
@ -260,8 +260,8 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
|
@ -298,7 +298,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
}
|
||||
|
||||
for i := range ids {
|
||||
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
}
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
|
@ -327,8 +327,8 @@ func TestEvacuateCancellation(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
|
@ -357,8 +357,8 @@ func TestEvacuateCancellationByError(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[1:2]
|
||||
|
@ -386,8 +386,8 @@ func TestEvacuateSingleProcess(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
@ -429,8 +429,8 @@ func TestEvacuateObjectsAsync(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
@ -515,7 +515,7 @@ func TestEvacuateTreesLocal(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[0:1]
|
||||
|
@ -594,8 +594,8 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
mutex := sync.Mutex{}
|
||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||
|
@ -753,7 +753,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
|
|||
prm.Scope = EvacuateScopeObjects
|
||||
prm.RepOneOnly = true
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
@ -810,7 +810,7 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
|
|||
prm.RepOneOnly = true
|
||||
prm.ContainerWorkerCount = 10
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
start := time.Now()
|
||||
_, err := e.Evacuate(context.Background(), prm)
|
||||
|
|
|
@ -121,7 +121,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
|||
return sh.ID(), nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
|
@ -139,8 +139,8 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh
|
|||
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
e.log.Warn(context.Background(), logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
||||
}
|
||||
|
||||
return sh, nil
|
||||
|
@ -203,7 +203,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
|||
|
||||
// removeShards removes specified shards. Skips non-existent shards.
|
||||
// Logs errors about shards that it could not Close after the removal.
|
||||
func (e *StorageEngine) removeShards(ids ...string) {
|
||||
func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
}
|
||||
|
@ -228,22 +228,22 @@ func (e *StorageEngine) removeShards(ids ...string) {
|
|||
delete(e.shardPools, id)
|
||||
}
|
||||
|
||||
e.log.Info(context.Background(), logs.EngineShardHasBeenRemoved,
|
||||
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
|
||||
zap.String("id", id))
|
||||
}
|
||||
e.mtx.Unlock()
|
||||
|
||||
for _, sh := range ss {
|
||||
err := sh.SetMode(mode.Disabled)
|
||||
err := sh.SetMode(ctx, mode.Disabled)
|
||||
if err != nil {
|
||||
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
|
||||
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
err = sh.Close()
|
||||
err = sh.Close(ctx)
|
||||
if err != nil {
|
||||
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
|
||||
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
@ -310,7 +310,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto
|
|||
// SetShardMode sets mode of the shard with provided identifier.
|
||||
//
|
||||
// Returns an error if shard mode was not set, or shard was not found in storage engine.
|
||||
func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
|
||||
func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
@ -320,7 +320,7 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
|
|||
sh.errorCount.Store(0)
|
||||
e.metrics.ClearErrorCounter(shID)
|
||||
}
|
||||
return sh.SetMode(m)
|
||||
return sh.SetMode(ctx, m)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -346,7 +346,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
||||
func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error {
|
||||
if len(ids) == 0 {
|
||||
return logicerr.New("ids must be non-empty")
|
||||
}
|
||||
|
@ -356,20 +356,20 @@ func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return e.closeShards(deletedShards)
|
||||
return e.closeShards(ctx, deletedShards)
|
||||
}
|
||||
|
||||
// closeShards closes deleted shards. Tries to close all shards.
|
||||
// Returns single error with joined shard errors.
|
||||
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
||||
func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error {
|
||||
var multiErr error
|
||||
var multiErrGuard sync.Mutex
|
||||
var eg errgroup.Group
|
||||
for _, sh := range deletedShards {
|
||||
eg.Go(func() error {
|
||||
err := sh.SetMode(mode.Disabled)
|
||||
err := sh.SetMode(ctx, mode.Disabled)
|
||||
if err != nil {
|
||||
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
|
||||
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
@ -378,9 +378,9 @@ func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
|||
multiErrGuard.Unlock()
|
||||
}
|
||||
|
||||
err = sh.Close()
|
||||
err = sh.Close(ctx)
|
||||
if err != nil {
|
||||
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
|
||||
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
|
||||
zap.Stringer("id", sh.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestRemoveShard(t *testing.T) {
|
|||
|
||||
for id, remove := range mSh {
|
||||
if remove {
|
||||
e.removeShards(id)
|
||||
e.removeShards(context.Background(), id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,11 +55,11 @@ func TestDisableShards(t *testing.T) {
|
|||
e, ids := te.engine, te.shardIDs
|
||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||
|
||||
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards(context.Background(), ids), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards(context.Background(), nil), new(logicerr.Logical))
|
||||
require.ErrorAs(t, e.DetachShards(context.Background(), []*shard.ID{}), new(logicerr.Logical))
|
||||
|
||||
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
|
||||
require.NoError(t, e.DetachShards(context.Background(), []*shard.ID{ids[0]}))
|
||||
|
||||
require.Equal(t, 1, len(e.shards))
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
// Component represents single storage component.
|
||||
type Component interface {
|
||||
Open(context.Context, mode.Mode) error
|
||||
SetMode(mode.Mode) error
|
||||
SetMode(context.Context, mode.Mode) error
|
||||
Init() error
|
||||
Close() error
|
||||
}
|
||||
|
@ -91,12 +91,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
// call `SetMode` on all not-yet-initialized components.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.SetMode(context.Background(), m))
|
||||
|
||||
t.Run("after open in RO", func(t *testing.T) {
|
||||
require.NoError(t, s.Close())
|
||||
require.NoError(t, s.Open(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.SetMode(context.Background(), m))
|
||||
})
|
||||
|
||||
require.NoError(t, s.Close())
|
||||
|
@ -106,7 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
// Use-case: notmal node operation.
|
||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.SetMode(context.Background(), m))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
|||
s := cons(t)
|
||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(from))
|
||||
require.NoError(t, s.SetMode(to))
|
||||
require.NoError(t, s.SetMode(context.Background(), from))
|
||||
require.NoError(t, s.SetMode(context.Background(), to))
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ var (
|
|||
)
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
||||
func (db *DB) Open(ctx context.Context, m mode.Mode) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
db.mode = m
|
||||
|
@ -48,10 +48,10 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
|||
if m.NoMetabase() {
|
||||
return nil
|
||||
}
|
||||
return db.openDB(m)
|
||||
return db.openDB(ctx, m)
|
||||
}
|
||||
|
||||
func (db *DB) openDB(mode mode.Mode) error {
|
||||
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||
|
@ -65,10 +65,10 @@ func (db *DB) openDB(mode mode.Mode) error {
|
|||
}
|
||||
db.boltOptions.ReadOnly = mode.ReadOnly()
|
||||
|
||||
return metaerr.Wrap(db.openBolt())
|
||||
return metaerr.Wrap(db.openBolt(ctx))
|
||||
}
|
||||
|
||||
func (db *DB) openBolt() error {
|
||||
func (db *DB) openBolt(ctx context.Context) error {
|
||||
var err error
|
||||
|
||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
|
@ -226,7 +226,7 @@ func (db *DB) close() error {
|
|||
// If there was a problem with applying new configuration, an error is returned.
|
||||
//
|
||||
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
|
||||
func (db *DB) Reload(opts ...Option) (bool, error) {
|
||||
func (db *DB) Reload(ctx context.Context, opts ...Option) (bool, error) {
|
||||
var c cfg
|
||||
for i := range opts {
|
||||
opts[i](&c)
|
||||
|
@ -243,7 +243,7 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
|
|||
db.mode = mode.Disabled
|
||||
db.metrics.SetMode(mode.ComponentDisabled)
|
||||
db.info.Path = c.info.Path
|
||||
if err := db.openBolt(); err != nil {
|
||||
if err := db.openBolt(ctx); err != nil {
|
||||
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -8,7 +9,7 @@ import (
|
|||
|
||||
// SetMode sets the metabase mode of operation.
|
||||
// If the mode assumes no operation metabase, the database is closed.
|
||||
func (db *DB) SetMode(m mode.Mode) error {
|
||||
func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
|
@ -25,7 +26,7 @@ func (db *DB) SetMode(m mode.Mode) error {
|
|||
if m.NoMetabase() {
|
||||
db.boltDB = nil
|
||||
} else {
|
||||
err := db.openDB(m)
|
||||
err := db.openDB(ctx, m)
|
||||
if err == nil && !m.ReadOnly() {
|
||||
err = db.Init()
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
@ -21,7 +22,7 @@ var (
|
|||
// If id is missing, returns nil, nil.
|
||||
//
|
||||
// GetShardID does not report any metrics.
|
||||
func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
||||
func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error) {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
db.mode = mode
|
||||
|
@ -30,7 +31,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
if err := db.openDB(mode); err != nil {
|
||||
if err := db.openDB(ctx, mode); err != nil {
|
||||
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
||||
}
|
||||
|
||||
|
@ -59,7 +60,7 @@ func (db *DB) readShardID() ([]byte, error) {
|
|||
|
||||
// SetShardID sets metabase operation mode
|
||||
// and writes shard id to db.
|
||||
func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
||||
func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) error {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
db.mode = mode
|
||||
|
@ -68,7 +69,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
|||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if err := db.openDB(mode); err != nil {
|
||||
if err := db.openDB(ctx, mode); err != nil {
|
||||
return fmt.Errorf("failed to open metabase: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestVersion(t *testing.T) {
|
|||
})
|
||||
t.Run("old data", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite))
|
||||
require.NoError(t, db.SetShardID(context.Background(), []byte{1, 2, 3, 4}, mode.ReadWrite))
|
||||
|
||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, db.Init())
|
||||
|
|
|
@ -91,7 +91,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
|
|||
return &b
|
||||
}
|
||||
|
||||
func (t *boltForest) SetMode(m mode.Mode) error {
|
||||
func (t *boltForest) SetMode(_ context.Context, m mode.Mode) error {
|
||||
t.modeMtx.Lock()
|
||||
defer t.modeMtx.Unlock()
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ func (f *memoryForest) Open(context.Context, mode.Mode) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) SetMode(mode.Mode) error {
|
||||
func (f *memoryForest) SetMode(context.Context, mode.Mode) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -65,7 +65,7 @@ type ForestStorage interface {
|
|||
Init() error
|
||||
Open(context.Context, mode.Mode) error
|
||||
Close() error
|
||||
SetMode(m mode.Mode) error
|
||||
SetMode(context.Context, mode.Mode) error
|
||||
SetParentID(id string)
|
||||
Forest
|
||||
|
||||
|
|
|
@ -20,23 +20,23 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
||||
s.log.Error(context.Background(), logs.ShardMetabaseFailureSwitchingMode,
|
||||
func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error {
|
||||
s.log.Error(ctx, logs.ShardMetabaseFailureSwitchingMode,
|
||||
zap.String("stage", stage),
|
||||
zap.Stringer("mode", mode.ReadOnly),
|
||||
zap.Error(err))
|
||||
|
||||
err = s.SetMode(mode.ReadOnly)
|
||||
err = s.SetMode(ctx, mode.ReadOnly)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.log.Error(context.Background(), logs.ShardCantMoveShardToReadonlySwitchMode,
|
||||
s.log.Error(ctx, logs.ShardCantMoveShardToReadonlySwitchMode,
|
||||
zap.String("stage", stage),
|
||||
zap.Stringer("mode", mode.DegradedReadOnly),
|
||||
zap.Error(err))
|
||||
|
||||
err = s.SetMode(mode.DegradedReadOnly)
|
||||
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
||||
}
|
||||
|
@ -75,7 +75,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
|||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||
}
|
||||
}
|
||||
err = s.handleMetabaseFailure("open", err)
|
||||
err = s.handleMetabaseFailure(ctx, "open", err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func (x *metabaseSynchronizer) Init() error {
|
|||
// Init initializes all Shard's components.
|
||||
func (s *Shard) Init(ctx context.Context) error {
|
||||
m := s.GetMode()
|
||||
if err := s.initializeComponents(m); err != nil {
|
||||
if err := s.initializeComponents(ctx, m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) initializeComponents(m mode.Mode) error {
|
||||
func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
||||
type initializer interface {
|
||||
Init() error
|
||||
}
|
||||
|
@ -176,7 +176,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
|
|||
return fmt.Errorf("metabase initialization: %w", err)
|
||||
}
|
||||
|
||||
err = s.handleMetabaseFailure("init", err)
|
||||
err = s.handleMetabaseFailure(ctx, "init", err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -364,9 +364,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
|||
}
|
||||
|
||||
// Close releases all Shard's components.
|
||||
func (s *Shard) Close() error {
|
||||
func (s *Shard) Close(ctx context.Context) error {
|
||||
if s.rb != nil {
|
||||
s.rb.Stop(s.log)
|
||||
s.rb.Stop(ctx, s.log)
|
||||
}
|
||||
var components []interface{ Close() error }
|
||||
|
||||
|
@ -386,7 +386,7 @@ func (s *Shard) Close() error {
|
|||
for _, component := range components {
|
||||
if err := component.Close(); err != nil {
|
||||
lastErr = err
|
||||
s.log.Error(context.Background(), logs.ShardCouldNotCloseShardComponent, zap.Error(err))
|
||||
s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -414,18 +414,18 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
|||
unlock := s.lockExclusive()
|
||||
defer unlock()
|
||||
|
||||
s.rb.Stop(s.log)
|
||||
s.rb.Stop(ctx, s.log)
|
||||
if !s.info.Mode.NoMetabase() {
|
||||
defer func() {
|
||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||
}()
|
||||
}
|
||||
|
||||
ok, err := s.metaBase.Reload(c.metaOpts...)
|
||||
ok, err := s.metaBase.Reload(ctx, c.metaOpts...)
|
||||
if err != nil {
|
||||
if errors.Is(err, meta.ErrDegradedMode) {
|
||||
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
|
||||
_ = s.setMode(mode.DegradedReadOnly)
|
||||
_ = s.setMode(ctx, mode.DegradedReadOnly)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -441,11 +441,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
|||
}
|
||||
if err != nil {
|
||||
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
|
||||
_ = s.setMode(mode.DegradedReadOnly)
|
||||
_ = s.setMode(ctx, mode.DegradedReadOnly)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return s.setMode(c.info.Mode)
|
||||
return s.setMode(ctx, c.info.Mode)
|
||||
}
|
||||
|
||||
func (s *Shard) lockExclusive() func() {
|
||||
|
|
|
@ -86,7 +86,7 @@ func TestShardOpen(t *testing.T) {
|
|||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
|
||||
// Metabase can be opened in read-only => start in ReadOnly mode.
|
||||
allowedMode.Store(int64(os.O_RDONLY))
|
||||
|
@ -95,9 +95,9 @@ func TestShardOpen(t *testing.T) {
|
|||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||
require.Error(t, sh.SetMode(mode.ReadWrite))
|
||||
require.Error(t, sh.SetMode(context.Background(), mode.ReadWrite))
|
||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
|
||||
// Metabase is corrupted => start in DegradedReadOnly mode.
|
||||
allowedMode.Store(math.MaxInt64)
|
||||
|
@ -106,7 +106,7 @@ func TestShardOpen(t *testing.T) {
|
|||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
}
|
||||
|
||||
func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||
|
@ -146,7 +146,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
putPrm.SetObject(obj)
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
|
||||
addr := object.AddressOf(obj)
|
||||
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
|
||||
|
@ -170,7 +170,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
getPrm.SetAddress(addr)
|
||||
_, err = sh.Get(context.Background(), getPrm)
|
||||
require.True(t, client.IsErrObjectNotFound(err))
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
}
|
||||
|
||||
func TestRefillMetabase(t *testing.T) {
|
||||
|
@ -358,7 +358,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
phyBefore := c.Phy
|
||||
logicalBefore := c.Logic
|
||||
|
||||
err = sh.Close()
|
||||
err = sh.Close(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
sh = New(
|
||||
|
@ -379,7 +379,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
defer sh.Close()
|
||||
defer sh.Close(context.Background())
|
||||
|
||||
checkAllObjs(false)
|
||||
checkObj(object.AddressOf(tombObj), nil)
|
||||
|
|
|
@ -37,7 +37,7 @@ func TestShard_Delete_BigObject(t *testing.T) {
|
|||
|
||||
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
|
||||
sh := newShard(t, hasWriteCache)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
|||
sh = New(opts...)
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
|
|
|
@ -34,7 +34,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
|||
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||
})},
|
||||
})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
||||
|
@ -131,7 +131,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
|||
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||
})},
|
||||
})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
lock := testutil.GenerateObjectWithCID(cnr)
|
||||
lock.SetType(objectSDK.TypeLock)
|
||||
|
@ -190,7 +190,7 @@ func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool
|
|||
additionalShardOptions: []Option{WithDisabledGC()},
|
||||
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
||||
})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
obj := testutil.GenerateObjectWithSize(1024)
|
||||
|
||||
|
@ -254,7 +254,7 @@ func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
|
|||
additionalShardOptions: []Option{WithDisabledGC()},
|
||||
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
||||
})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
obj := testutil.GenerateObjectWithSize(1024)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ func TestShard_Get(t *testing.T) {
|
|||
|
||||
func testShardGet(t *testing.T, hasWriteCache bool) {
|
||||
sh := newShard(t, hasWriteCache)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
var putPrm PutPrm
|
||||
var getPrm GetPrm
|
||||
|
|
|
@ -28,7 +28,7 @@ func TestShard_Head(t *testing.T) {
|
|||
|
||||
func testShardHead(t *testing.T, hasWriteCache bool) {
|
||||
sh := newShard(t, hasWriteCache)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
var putPrm PutPrm
|
||||
var headPrm HeadPrm
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -30,11 +31,11 @@ func (s *Shard) ID() *ID {
|
|||
}
|
||||
|
||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||
func (s *Shard) UpdateID() (err error) {
|
||||
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||
var idFromMetabase []byte
|
||||
modeDegraded := s.GetMode().NoMetabase()
|
||||
if !modeDegraded {
|
||||
if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil {
|
||||
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +63,7 @@ func (s *Shard) UpdateID() (err error) {
|
|||
}
|
||||
|
||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||
if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil {
|
||||
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestShard_Inhume(t *testing.T) {
|
|||
|
||||
func testShardInhume(t *testing.T, hasWriteCache bool) {
|
||||
sh := newShard(t, hasWriteCache)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
||||
|
|
|
@ -18,14 +18,14 @@ func TestShard_List(t *testing.T) {
|
|||
t.Run("without write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
sh := newShard(t, false)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
testShardList(t, sh)
|
||||
})
|
||||
|
||||
t.Run("with write cache", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
shWC := newShard(t, true)
|
||||
defer func() { require.NoError(t, shWC.Close()) }()
|
||||
defer func() { require.NoError(t, shWC.Close(context.Background())) }()
|
||||
testShardList(t, shWC)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
|
@ -148,7 +148,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
|
||||
func TestShard_IsLocked(t *testing.T) {
|
||||
sh := newShard(t, false)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
|
|
|
@ -201,11 +201,11 @@ func TestCounters(t *testing.T) {
|
|||
|
||||
dir := t.TempDir()
|
||||
sh, mm := shardWithMetrics(t, dir)
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
sh.SetMode(mode.ReadOnly)
|
||||
sh.SetMode(context.Background(), mode.ReadOnly)
|
||||
require.Equal(t, mode.ReadOnly, mm.mode)
|
||||
sh.SetMode(mode.ReadWrite)
|
||||
sh.SetMode(context.Background(), mode.ReadWrite)
|
||||
require.Equal(t, mode.ReadWrite, mm.mode)
|
||||
|
||||
const objNumber = 10
|
||||
|
|
|
@ -20,19 +20,21 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
|
|||
//
|
||||
// Returns any error encountered that did not allow
|
||||
// setting shard mode.
|
||||
func (s *Shard) SetMode(m mode.Mode) error {
|
||||
func (s *Shard) SetMode(ctx context.Context, m mode.Mode) error {
|
||||
unlock := s.lockExclusive()
|
||||
defer unlock()
|
||||
|
||||
return s.setMode(m)
|
||||
return s.setMode(ctx, m)
|
||||
}
|
||||
|
||||
func (s *Shard) setMode(m mode.Mode) error {
|
||||
s.log.Info(context.Background(), logs.ShardSettingShardMode,
|
||||
func (s *Shard) setMode(ctx context.Context, m mode.Mode) error {
|
||||
s.log.Info(ctx, logs.ShardSettingShardMode,
|
||||
zap.Stringer("old_mode", s.info.Mode),
|
||||
zap.Stringer("new_mode", m))
|
||||
|
||||
components := []interface{ SetMode(mode.Mode) error }{
|
||||
components := []interface {
|
||||
SetMode(context.Context, mode.Mode) error
|
||||
}{
|
||||
s.metaBase, s.blobStor,
|
||||
}
|
||||
|
||||
|
@ -60,7 +62,7 @@ func (s *Shard) setMode(m mode.Mode) error {
|
|||
|
||||
if !m.Disabled() {
|
||||
for i := range components {
|
||||
if err := components[i].SetMode(m); err != nil {
|
||||
if err := components[i].SetMode(ctx, m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +71,7 @@ func (s *Shard) setMode(m mode.Mode) error {
|
|||
s.info.Mode = m
|
||||
s.metricsWriter.SetMode(s.info.Mode)
|
||||
|
||||
s.log.Info(context.Background(), logs.ShardShardModeSetSuccessfully,
|
||||
s.log.Info(ctx, logs.ShardShardModeSetSuccessfully,
|
||||
zap.Stringer("mode", s.info.Mode))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
|||
}),
|
||||
},
|
||||
})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
|
|
@ -125,7 +125,7 @@ func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLi
|
|||
}
|
||||
}
|
||||
|
||||
func (r *rebuilder) Stop(log *logger.Logger) {
|
||||
func (r *rebuilder) Stop(ctx context.Context, log *logger.Logger) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
|
@ -138,7 +138,7 @@ func (r *rebuilder) Stop(log *logger.Logger) {
|
|||
r.wg.Wait()
|
||||
r.cancel = nil
|
||||
r.done = nil
|
||||
log.Info(context.Background(), logs.BlobstoreRebuildStopped)
|
||||
log.Info(ctx, logs.BlobstoreRebuildStopped)
|
||||
}
|
||||
|
||||
var errMBIsNotAvailable = errors.New("metabase is not available")
|
||||
|
|
|
@ -34,7 +34,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
|||
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
||||
})
|
||||
|
||||
defer func() { require.NoError(b, sh.Close()) }()
|
||||
defer func() { require.NoError(b, sh.Close(context.Background())) }()
|
||||
|
||||
var putPrm PutPrm
|
||||
|
||||
|
@ -61,7 +61,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
|||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
require.NoError(b, sh.Close())
|
||||
require.NoError(b, sh.Close(context.Background()))
|
||||
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
|
||||
|
||||
require.NoError(b, sh.Open(context.Background()))
|
||||
|
@ -72,5 +72,5 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
|||
|
||||
require.NoError(b, sh.Init(context.Background()))
|
||||
|
||||
require.NoError(b, sh.Close())
|
||||
require.NoError(b, sh.Close(context.Background()))
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func TestShardReload(t *testing.T) {
|
|||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
}()
|
||||
|
||||
objects := make([]objAddr, 5)
|
||||
|
|
|
@ -52,10 +52,10 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
|||
})
|
||||
}
|
||||
require.NoError(t, errG.Wait())
|
||||
require.NoError(t, sh.Close())
|
||||
require.NoError(t, sh.Close(context.Background()))
|
||||
|
||||
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
|
||||
defer func() { require.NoError(t, sh.Close()) }()
|
||||
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||
|
||||
var getPrm GetPrm
|
||||
|
||||
|
|
|
@ -117,8 +117,8 @@ func runFlushTest[Option any](
|
|||
defer func() { require.NoError(t, wc.Close()) }()
|
||||
objects := putObjects(t, wc)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||
|
||||
require.NoError(t, wc.Flush(context.Background(), false, false))
|
||||
|
||||
|
@ -131,11 +131,11 @@ func runFlushTest[Option any](
|
|||
objects := putObjects(t, wc)
|
||||
|
||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||
require.Error(t, wc.SetMode(mode.Degraded))
|
||||
require.Error(t, wc.SetMode(context.Background(), mode.Degraded))
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, wc.SetMode(context.Background(), mode.Degraded))
|
||||
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
@ -149,8 +149,8 @@ func runFlushTest[Option any](
|
|||
objects := putObjects(t, wc)
|
||||
f.InjectFn(t, wc)
|
||||
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||
|
||||
require.Equal(t, uint32(0), errCount.Load())
|
||||
require.Error(t, wc.Flush(context.Background(), false, false))
|
||||
|
@ -191,8 +191,8 @@ func newCache[Option any](
|
|||
require.NoError(t, wc.Init())
|
||||
|
||||
// First set mode for metabase and blobstor to prevent background flushes.
|
||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly))
|
||||
|
||||
return wc, bs, mb
|
||||
}
|
||||
|
|
|
@ -23,8 +23,8 @@ type setModePrm struct {
|
|||
// SetMode sets write-cache mode of operation.
|
||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||
// and all background jobs are suspended.
|
||||
func (c *cache) SetMode(m mode.Mode) error {
|
||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
||||
func (c *cache) SetMode(ctx context.Context, m mode.Mode) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.SetMode",
|
||||
trace.WithAttributes(
|
||||
attribute.String("mode", m.String()),
|
||||
))
|
||||
|
|
|
@ -38,7 +38,7 @@ type Cache interface {
|
|||
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
||||
Delete(context.Context, oid.Address) error
|
||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
||||
SetMode(mode.Mode) error
|
||||
SetMode(context.Context, mode.Mode) error
|
||||
SetLogger(*logger.Logger)
|
||||
DumpInfo() Info
|
||||
Flush(context.Context, bool, bool) error
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
||||
func (s *Server) DetachShards(ctx context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
|
@ -19,7 +19,7 @@ func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsReques
|
|||
|
||||
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
|
||||
|
||||
if err := s.s.DetachShards(shardIDs); err != nil {
|
||||
if err := s.s.DetachShards(ctx, shardIDs); err != nil {
|
||||
if errors.As(err, new(logicerr.Logical)) {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
|
||||
func (s *Server) SetShardMode(ctx context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
|
||||
// verify request
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
|
@ -38,7 +38,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
|
|||
}
|
||||
|
||||
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
||||
err = s.s.SetShardMode(shardID, m, req.GetBody().GetResetErrorCounter())
|
||||
err = s.s.SetShardMode(ctx, shardID, m, req.GetBody().GetResetErrorCounter())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue