From 16598553d9603cdf7d5b349db250041a80f9ec14 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 21 Oct 2024 11:56:38 +0300 Subject: [PATCH] [#1437] shard: Fix contextcheck linter Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/blobstor/mode.go | 4 +-- pkg/local_object_storage/engine/control.go | 20 +++++------ pkg/local_object_storage/engine/engine.go | 10 +++--- pkg/local_object_storage/engine/error_test.go | 4 +-- .../engine/evacuate_test.go | 36 +++++++++---------- pkg/local_object_storage/engine/shards.go | 36 +++++++++---------- .../engine/shards_test.go | 10 +++--- .../internal/storagetest/storage.go | 12 +++---- pkg/local_object_storage/metabase/control.go | 14 ++++---- pkg/local_object_storage/metabase/mode.go | 5 +-- pkg/local_object_storage/metabase/shard_id.go | 9 ++--- .../metabase/version_test.go | 2 +- pkg/local_object_storage/pilorama/boltdb.go | 2 +- pkg/local_object_storage/pilorama/forest.go | 2 +- .../pilorama/interface.go | 2 +- pkg/local_object_storage/shard/control.go | 34 +++++++++--------- .../shard/control_test.go | 16 ++++----- pkg/local_object_storage/shard/delete_test.go | 2 +- .../shard/gc_internal_test.go | 2 +- pkg/local_object_storage/shard/gc_test.go | 8 ++--- pkg/local_object_storage/shard/get_test.go | 2 +- pkg/local_object_storage/shard/head_test.go | 2 +- pkg/local_object_storage/shard/id.go | 7 ++-- pkg/local_object_storage/shard/inhume_test.go | 2 +- pkg/local_object_storage/shard/list_test.go | 4 +-- pkg/local_object_storage/shard/lock_test.go | 4 +-- .../shard/metrics_test.go | 6 ++-- pkg/local_object_storage/shard/mode.go | 16 +++++---- pkg/local_object_storage/shard/range_test.go | 2 +- pkg/local_object_storage/shard/rebuild.go | 4 +-- pkg/local_object_storage/shard/refill_test.go | 6 ++-- pkg/local_object_storage/shard/reload_test.go | 2 +- .../shard/shutdown_test.go | 4 +-- .../writecache/flush_test.go | 20 +++++------ pkg/local_object_storage/writecache/mode.go | 4 +-- .../writecache/writecache.go | 2 +- pkg/services/control/server/detach_shards.go | 4 +-- pkg/services/control/server/set_shard_mode.go | 4 +-- 38 files changed, 165 insertions(+), 160 deletions(-) diff --git a/pkg/local_object_storage/blobstor/mode.go b/pkg/local_object_storage/blobstor/mode.go index a579a6f9..f081ff64 100644 --- a/pkg/local_object_storage/blobstor/mode.go +++ b/pkg/local_object_storage/blobstor/mode.go @@ -8,7 +8,7 @@ import ( ) // SetMode sets the blobstor mode of operation. -func (b *BlobStor) SetMode(m mode.Mode) error { +func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error { b.modeMtx.Lock() defer b.modeMtx.Unlock() @@ -22,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error { err := b.Close() if err == nil { - if err = b.openBlobStor(context.TODO(), m); err == nil { + if err = b.openBlobStor(ctx, m); err == nil { err = b.Init() } } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 98ec73ae..a5c53dca 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -56,7 +56,7 @@ func (e *StorageEngine) open(ctx context.Context) error { sh := e.shards[res.id] delete(e.shards, res.id) - err := sh.Close() + err := sh.Close(ctx) if err != nil { e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, zap.String("id", res.id), @@ -108,7 +108,7 @@ func (e *StorageEngine) Init(ctx context.Context) error { sh := e.shards[res.id] delete(e.shards, res.id) - err := sh.Close() + err := sh.Close(ctx) if err != nil { e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard, zap.String("id", res.id), @@ -126,7 +126,7 @@ func (e *StorageEngine) Init(ctx context.Context) error { } e.wg.Add(1) - go e.setModeLoop() + go e.setModeLoop(ctx) return nil } @@ -153,7 +153,7 @@ func (e *StorageEngine) Close(ctx context.Context) error { } // closes all shards. Never returns an error, shard errors are logged. -func (e *StorageEngine) close(releasePools bool) error { +func (e *StorageEngine) close(ctx context.Context, releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -164,8 +164,8 @@ func (e *StorageEngine) close(releasePools bool) error { } for id, sh := range e.shards { - if err := sh.Close(); err != nil { - e.log.Debug(context.Background(), logs.EngineCouldNotCloseShard, + if err := sh.Close(ctx); err != nil { + e.log.Debug(ctx, logs.EngineCouldNotCloseShard, zap.String("id", id), zap.String("error", err.Error()), ) @@ -213,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error { return e.open(ctx) } } else if prevErr == nil { // ok -> block - return e.close(errors.Is(err, errClosed)) + return e.close(ctx, errors.Is(err, errClosed)) } // otherwise do nothing @@ -306,7 +306,7 @@ loop: e.mtx.RUnlock() - e.removeShards(shardsToRemove...) + e.removeShards(ctx, shardsToRemove...) for _, p := range shardsToReload { err := p.sh.Reload(ctx, p.opts...) @@ -330,13 +330,13 @@ loop: err = sh.Init(ctx) } if err != nil { - _ = sh.Close() + _ = sh.Close(ctx) return fmt.Errorf("could not init %s shard: %w", idStr, err) } err = e.addShard(sh) if err != nil { - _ = sh.Close() + _ = sh.Close(ctx) return fmt.Errorf("could not add %s shard: %w", idStr, err) } diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index a8caa215..6e30ee9d 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -55,7 +55,7 @@ type setModeRequest struct { // setModeLoop listens setModeCh to perform degraded mode transition of a single shard. // Instead of creating a worker per single shard we use a single goroutine. -func (e *StorageEngine) setModeLoop() { +func (e *StorageEngine) setModeLoop(ctx context.Context) { defer e.wg.Done() var ( @@ -75,7 +75,7 @@ func (e *StorageEngine) setModeLoop() { if !ok { inProgress[sid] = struct{}{} go func() { - e.moveToDegraded(r.sh, r.errorCount, r.isMeta) + e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta) mtx.Lock() delete(inProgress, sid) @@ -87,7 +87,7 @@ func (e *StorageEngine) setModeLoop() { } } -func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) { +func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) { sid := sh.ID() log := e.log.With( zap.Stringer("shard_id", sid), @@ -97,7 +97,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta defer e.mtx.RUnlock() if isMeta { - err := sh.SetMode(mode.DegradedReadOnly) + err := sh.SetMode(ctx, mode.DegradedReadOnly) if err == nil { log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold) return @@ -106,7 +106,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta zap.Error(err)) } - err := sh.SetMode(mode.ReadOnly) + err := sh.SetMode(ctx, mode.ReadOnly) if err != nil { log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err)) return diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 1619003a..57c42376 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -158,10 +158,10 @@ func TestErrorReporting(t *testing.T) { checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite) } - require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false)) + require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, false)) checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite) - require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true)) + require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, true)) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) require.NoError(t, te.ng.Close(context.Background())) }) diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 8498c924..54eacc3f 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -146,7 +146,7 @@ func TestEvacuateShardObjects(t *testing.T) { require.Equal(t, uint64(0), res.ObjectsEvacuated()) }) - require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) @@ -237,7 +237,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) { evacuateShardID := ids[0].String() - require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[0:1] @@ -260,8 +260,8 @@ func TestEvacuateObjectsNetwork(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] @@ -298,7 +298,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) { } for i := range ids { - require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly)) } var prm EvacuateShardPrm @@ -327,8 +327,8 @@ func TestEvacuateCancellation(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] @@ -357,8 +357,8 @@ func TestEvacuateCancellationByError(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[1:2] @@ -386,8 +386,8 @@ func TestEvacuateSingleProcess(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) blocker := make(chan interface{}) running := make(chan interface{}) @@ -429,8 +429,8 @@ func TestEvacuateObjectsAsync(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) blocker := make(chan interface{}) running := make(chan interface{}) @@ -515,7 +515,7 @@ func TestEvacuateTreesLocal(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) var prm EvacuateShardPrm prm.ShardID = ids[0:1] @@ -594,8 +594,8 @@ func TestEvacuateTreesRemote(t *testing.T) { require.NoError(t, e.Close(context.Background())) }() - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) - require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly)) mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) @@ -753,7 +753,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { prm.Scope = EvacuateScopeObjects prm.RepOneOnly = true - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) @@ -810,7 +810,7 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { prm.RepOneOnly = true prm.ContainerWorkerCount = 10 - require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly)) start := time.Now() _, err := e.Evacuate(context.Background(), prm) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index e172706e..2b94103e 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -121,7 +121,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh return sh.ID(), nil } -func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) { +func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) { id, err := generateShardID() if err != nil { return nil, fmt.Errorf("could not generate shard ID: %w", err) @@ -139,8 +139,8 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh shard.WithZeroCountCallback(e.processZeroCountContainers), )...) - if err := sh.UpdateID(); err != nil { - e.log.Warn(context.Background(), logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err)) + if err := sh.UpdateID(ctx); err != nil { + e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err)) } return sh, nil @@ -203,7 +203,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error { // removeShards removes specified shards. Skips non-existent shards. // Logs errors about shards that it could not Close after the removal. -func (e *StorageEngine) removeShards(ids ...string) { +func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) { if len(ids) == 0 { return } @@ -228,22 +228,22 @@ func (e *StorageEngine) removeShards(ids ...string) { delete(e.shardPools, id) } - e.log.Info(context.Background(), logs.EngineShardHasBeenRemoved, + e.log.Info(ctx, logs.EngineShardHasBeenRemoved, zap.String("id", id)) } e.mtx.Unlock() for _, sh := range ss { - err := sh.SetMode(mode.Disabled) + err := sh.SetMode(ctx, mode.Disabled) if err != nil { - e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled, + e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled, zap.Stringer("id", sh.ID()), zap.Error(err), ) } - err = sh.Close() + err = sh.Close(ctx) if err != nil { - e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard, + e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard, zap.Stringer("id", sh.ID()), zap.Error(err), ) @@ -310,7 +310,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto // SetShardMode sets mode of the shard with provided identifier. // // Returns an error if shard mode was not set, or shard was not found in storage engine. -func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error { +func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -320,7 +320,7 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte sh.errorCount.Store(0) e.metrics.ClearErrorCounter(shID) } - return sh.SetMode(m) + return sh.SetMode(ctx, m) } } @@ -346,7 +346,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { } } -func (e *StorageEngine) DetachShards(ids []*shard.ID) error { +func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error { if len(ids) == 0 { return logicerr.New("ids must be non-empty") } @@ -356,20 +356,20 @@ func (e *StorageEngine) DetachShards(ids []*shard.ID) error { return err } - return e.closeShards(deletedShards) + return e.closeShards(ctx, deletedShards) } // closeShards closes deleted shards. Tries to close all shards. // Returns single error with joined shard errors. -func (e *StorageEngine) closeShards(deletedShards []hashedShard) error { +func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error { var multiErr error var multiErrGuard sync.Mutex var eg errgroup.Group for _, sh := range deletedShards { eg.Go(func() error { - err := sh.SetMode(mode.Disabled) + err := sh.SetMode(ctx, mode.Disabled) if err != nil { - e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled, + e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled, zap.Stringer("id", sh.ID()), zap.Error(err), ) @@ -378,9 +378,9 @@ func (e *StorageEngine) closeShards(deletedShards []hashedShard) error { multiErrGuard.Unlock() } - err = sh.Close() + err = sh.Close(ctx) if err != nil { - e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard, + e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard, zap.Stringer("id", sh.ID()), zap.Error(err), ) diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index 3347d58f..207491bd 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -33,7 +33,7 @@ func TestRemoveShard(t *testing.T) { for id, remove := range mSh { if remove { - e.removeShards(id) + e.removeShards(context.Background(), id) } } @@ -55,11 +55,11 @@ func TestDisableShards(t *testing.T) { e, ids := te.engine, te.shardIDs defer func() { require.NoError(t, e.Close(context.Background())) }() - require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical)) - require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical)) - require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical)) + require.ErrorAs(t, e.DetachShards(context.Background(), ids), new(logicerr.Logical)) + require.ErrorAs(t, e.DetachShards(context.Background(), nil), new(logicerr.Logical)) + require.ErrorAs(t, e.DetachShards(context.Background(), []*shard.ID{}), new(logicerr.Logical)) - require.NoError(t, e.DetachShards([]*shard.ID{ids[0]})) + require.NoError(t, e.DetachShards(context.Background(), []*shard.ID{ids[0]})) require.Equal(t, 1, len(e.shards)) } diff --git a/pkg/local_object_storage/internal/storagetest/storage.go b/pkg/local_object_storage/internal/storagetest/storage.go index 586b3dcc..d8ac106d 100644 --- a/pkg/local_object_storage/internal/storagetest/storage.go +++ b/pkg/local_object_storage/internal/storagetest/storage.go @@ -11,7 +11,7 @@ import ( // Component represents single storage component. type Component interface { Open(context.Context, mode.Mode) error - SetMode(mode.Mode) error + SetMode(context.Context, mode.Mode) error Init() error Close() error } @@ -91,12 +91,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { // call `SetMode` on all not-yet-initialized components. s := cons(t) require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) - require.NoError(t, s.SetMode(m)) + require.NoError(t, s.SetMode(context.Background(), m)) t.Run("after open in RO", func(t *testing.T) { require.NoError(t, s.Close()) require.NoError(t, s.Open(context.Background(), mode.ReadOnly)) - require.NoError(t, s.SetMode(m)) + require.NoError(t, s.SetMode(context.Background(), m)) }) require.NoError(t, s.Close()) @@ -106,7 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { // Use-case: notmal node operation. require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) require.NoError(t, s.Init()) - require.NoError(t, s.SetMode(m)) + require.NoError(t, s.SetMode(context.Background(), m)) require.NoError(t, s.Close()) }) } @@ -116,7 +116,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) { s := cons(t) require.NoError(t, s.Open(context.Background(), mode.ReadWrite)) require.NoError(t, s.Init()) - require.NoError(t, s.SetMode(from)) - require.NoError(t, s.SetMode(to)) + require.NoError(t, s.SetMode(context.Background(), from)) + require.NoError(t, s.SetMode(context.Background(), to)) require.NoError(t, s.Close()) } diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 68e065a0..54bea420 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -39,7 +39,7 @@ var ( ) // Open boltDB instance for metabase. -func (db *DB) Open(_ context.Context, m mode.Mode) error { +func (db *DB) Open(ctx context.Context, m mode.Mode) error { db.modeMtx.Lock() defer db.modeMtx.Unlock() db.mode = m @@ -48,10 +48,10 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error { if m.NoMetabase() { return nil } - return db.openDB(m) + return db.openDB(ctx, m) } -func (db *DB) openDB(mode mode.Mode) error { +func (db *DB) openDB(ctx context.Context, mode mode.Mode) error { err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) if err != nil { return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) @@ -65,10 +65,10 @@ func (db *DB) openDB(mode mode.Mode) error { } db.boltOptions.ReadOnly = mode.ReadOnly() - return metaerr.Wrap(db.openBolt()) + return metaerr.Wrap(db.openBolt(ctx)) } -func (db *DB) openBolt() error { +func (db *DB) openBolt(ctx context.Context) error { var err error db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) @@ -226,7 +226,7 @@ func (db *DB) close() error { // If there was a problem with applying new configuration, an error is returned. // // If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned. -func (db *DB) Reload(opts ...Option) (bool, error) { +func (db *DB) Reload(ctx context.Context, opts ...Option) (bool, error) { var c cfg for i := range opts { opts[i](&c) @@ -243,7 +243,7 @@ func (db *DB) Reload(opts ...Option) (bool, error) { db.mode = mode.Disabled db.metrics.SetMode(mode.ComponentDisabled) db.info.Path = c.info.Path - if err := db.openBolt(); err != nil { + if err := db.openBolt(ctx); err != nil { return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err)) } diff --git a/pkg/local_object_storage/metabase/mode.go b/pkg/local_object_storage/metabase/mode.go index 2032ed6b..f99262be 100644 --- a/pkg/local_object_storage/metabase/mode.go +++ b/pkg/local_object_storage/metabase/mode.go @@ -1,6 +1,7 @@ package meta import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -8,7 +9,7 @@ import ( // SetMode sets the metabase mode of operation. // If the mode assumes no operation metabase, the database is closed. -func (db *DB) SetMode(m mode.Mode) error { +func (db *DB) SetMode(ctx context.Context, m mode.Mode) error { db.modeMtx.Lock() defer db.modeMtx.Unlock() @@ -25,7 +26,7 @@ func (db *DB) SetMode(m mode.Mode) error { if m.NoMetabase() { db.boltDB = nil } else { - err := db.openDB(m) + err := db.openDB(ctx, m) if err == nil && !m.ReadOnly() { err = db.Init() } diff --git a/pkg/local_object_storage/metabase/shard_id.go b/pkg/local_object_storage/metabase/shard_id.go index 88446494..e58115bc 100644 --- a/pkg/local_object_storage/metabase/shard_id.go +++ b/pkg/local_object_storage/metabase/shard_id.go @@ -2,6 +2,7 @@ package meta import ( "bytes" + "context" "errors" "fmt" "os" @@ -21,7 +22,7 @@ var ( // If id is missing, returns nil, nil. // // GetShardID does not report any metrics. -func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) { +func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error) { db.modeMtx.Lock() defer db.modeMtx.Unlock() db.mode = mode @@ -30,7 +31,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) { return nil, nil } - if err := db.openDB(mode); err != nil { + if err := db.openDB(ctx, mode); err != nil { return nil, fmt.Errorf("failed to open metabase: %w", err) } @@ -59,7 +60,7 @@ func (db *DB) readShardID() ([]byte, error) { // SetShardID sets metabase operation mode // and writes shard id to db. -func (db *DB) SetShardID(id []byte, mode metamode.Mode) error { +func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) error { db.modeMtx.Lock() defer db.modeMtx.Unlock() db.mode = mode @@ -68,7 +69,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error { return ErrReadOnlyMode } - if err := db.openDB(mode); err != nil { + if err := db.openDB(ctx, mode); err != nil { return fmt.Errorf("failed to open metabase: %w", err) } diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index 75229a1b..509e7247 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -58,7 +58,7 @@ func TestVersion(t *testing.T) { }) t.Run("old data", func(t *testing.T) { db := newDB(t) - require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite)) + require.NoError(t, db.SetShardID(context.Background(), []byte{1, 2, 3, 4}, mode.ReadWrite)) require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) require.NoError(t, db.Init()) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index a778434d..9ffcf1e8 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -91,7 +91,7 @@ func NewBoltForest(opts ...Option) ForestStorage { return &b } -func (t *boltForest) SetMode(m mode.Mode) error { +func (t *boltForest) SetMode(_ context.Context, m mode.Mode) error { t.modeMtx.Lock() defer t.modeMtx.Unlock() diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 37494374..76da1c0c 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -128,7 +128,7 @@ func (f *memoryForest) Open(context.Context, mode.Mode) error { return nil } -func (f *memoryForest) SetMode(mode.Mode) error { +func (f *memoryForest) SetMode(context.Context, mode.Mode) error { return nil } diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index b6ca246f..9717b240 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -65,7 +65,7 @@ type ForestStorage interface { Init() error Open(context.Context, mode.Mode) error Close() error - SetMode(m mode.Mode) error + SetMode(context.Context, mode.Mode) error SetParentID(id string) Forest diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index eb3aa61c..056737a9 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -20,23 +20,23 @@ import ( "golang.org/x/sync/errgroup" ) -func (s *Shard) handleMetabaseFailure(stage string, err error) error { - s.log.Error(context.Background(), logs.ShardMetabaseFailureSwitchingMode, +func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error { + s.log.Error(ctx, logs.ShardMetabaseFailureSwitchingMode, zap.String("stage", stage), zap.Stringer("mode", mode.ReadOnly), zap.Error(err)) - err = s.SetMode(mode.ReadOnly) + err = s.SetMode(ctx, mode.ReadOnly) if err == nil { return nil } - s.log.Error(context.Background(), logs.ShardCantMoveShardToReadonlySwitchMode, + s.log.Error(ctx, logs.ShardCantMoveShardToReadonlySwitchMode, zap.String("stage", stage), zap.Stringer("mode", mode.DegradedReadOnly), zap.Error(err)) - err = s.SetMode(mode.DegradedReadOnly) + err = s.SetMode(ctx, mode.DegradedReadOnly) if err != nil { return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly)) } @@ -75,7 +75,7 @@ func (s *Shard) Open(ctx context.Context) error { return fmt.Errorf("could not open %T: %w", components[j], err) } } - err = s.handleMetabaseFailure("open", err) + err = s.handleMetabaseFailure(ctx, "open", err) if err != nil { return err } @@ -101,7 +101,7 @@ func (x *metabaseSynchronizer) Init() error { // Init initializes all Shard's components. func (s *Shard) Init(ctx context.Context) error { m := s.GetMode() - if err := s.initializeComponents(m); err != nil { + if err := s.initializeComponents(ctx, m); err != nil { return err } @@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error { return nil } -func (s *Shard) initializeComponents(m mode.Mode) error { +func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error { type initializer interface { Init() error } @@ -176,7 +176,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error { return fmt.Errorf("metabase initialization: %w", err) } - err = s.handleMetabaseFailure("init", err) + err = s.handleMetabaseFailure(ctx, "init", err) if err != nil { return err } @@ -364,9 +364,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object } // Close releases all Shard's components. -func (s *Shard) Close() error { +func (s *Shard) Close(ctx context.Context) error { if s.rb != nil { - s.rb.Stop(s.log) + s.rb.Stop(ctx, s.log) } var components []interface{ Close() error } @@ -386,7 +386,7 @@ func (s *Shard) Close() error { for _, component := range components { if err := component.Close(); err != nil { lastErr = err - s.log.Error(context.Background(), logs.ShardCouldNotCloseShardComponent, zap.Error(err)) + s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err)) } } @@ -414,18 +414,18 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error { unlock := s.lockExclusive() defer unlock() - s.rb.Stop(s.log) + s.rb.Stop(ctx, s.log) if !s.info.Mode.NoMetabase() { defer func() { s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) }() } - ok, err := s.metaBase.Reload(c.metaOpts...) + ok, err := s.metaBase.Reload(ctx, c.metaOpts...) if err != nil { if errors.Is(err, meta.ErrDegradedMode) { s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err)) - _ = s.setMode(mode.DegradedReadOnly) + _ = s.setMode(ctx, mode.DegradedReadOnly) } return err } @@ -441,11 +441,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error { } if err != nil { s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err)) - _ = s.setMode(mode.DegradedReadOnly) + _ = s.setMode(ctx, mode.DegradedReadOnly) return err } } - return s.setMode(c.info.Mode) + return s.setMode(ctx, c.info.Mode) } func (s *Shard) lockExclusive() func() { diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index b8f1d441..6d2cd713 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -86,7 +86,7 @@ func TestShardOpen(t *testing.T) { require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadWrite, sh.GetMode()) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) // Metabase can be opened in read-only => start in ReadOnly mode. allowedMode.Store(int64(os.O_RDONLY)) @@ -95,9 +95,9 @@ func TestShardOpen(t *testing.T) { require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadOnly, sh.GetMode()) - require.Error(t, sh.SetMode(mode.ReadWrite)) + require.Error(t, sh.SetMode(context.Background(), mode.ReadWrite)) require.Equal(t, mode.ReadOnly, sh.GetMode()) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) // Metabase is corrupted => start in DegradedReadOnly mode. allowedMode.Store(math.MaxInt64) @@ -106,7 +106,7 @@ func TestShardOpen(t *testing.T) { require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) } func TestRefillMetabaseCorrupted(t *testing.T) { @@ -146,7 +146,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { putPrm.SetObject(obj) _, err := sh.Put(context.Background(), putPrm) require.NoError(t, err) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) addr := object.AddressOf(obj) // This is copied from `fstree.treePath()` to avoid exporting function just for tests. @@ -170,7 +170,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { getPrm.SetAddress(addr) _, err = sh.Get(context.Background(), getPrm) require.True(t, client.IsErrObjectNotFound(err)) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) } func TestRefillMetabase(t *testing.T) { @@ -358,7 +358,7 @@ func TestRefillMetabase(t *testing.T) { phyBefore := c.Phy logicalBefore := c.Logic - err = sh.Close() + err = sh.Close(context.Background()) require.NoError(t, err) sh = New( @@ -379,7 +379,7 @@ func TestRefillMetabase(t *testing.T) { // initialize Blobstor require.NoError(t, sh.Init(context.Background())) - defer sh.Close() + defer sh.Close(context.Background()) checkAllObjs(false) checkObj(object.AddressOf(tombObj), nil) diff --git a/pkg/local_object_storage/shard/delete_test.go b/pkg/local_object_storage/shard/delete_test.go index 574250a9..c9ce93bc 100644 --- a/pkg/local_object_storage/shard/delete_test.go +++ b/pkg/local_object_storage/shard/delete_test.go @@ -37,7 +37,7 @@ func TestShard_Delete_BigObject(t *testing.T) { func testShard(t *testing.T, hasWriteCache bool, payloadSize int) { sh := newShard(t, hasWriteCache) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index 11db5e54..39073a52 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -79,7 +79,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { sh = New(opts...) require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() obj := testutil.GenerateObjectWithCID(cnr) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 2b97111e..e3670b44 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -34,7 +34,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) { return util.NewPseudoWorkerPool() // synchronous event processing })}, }) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() @@ -131,7 +131,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) { return util.NewPseudoWorkerPool() // synchronous event processing })}, }) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() lock := testutil.GenerateObjectWithCID(cnr) lock.SetType(objectSDK.TypeLock) @@ -190,7 +190,7 @@ func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool additionalShardOptions: []Option{WithDisabledGC()}, wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, }) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() obj := testutil.GenerateObjectWithSize(1024) @@ -254,7 +254,7 @@ func TestGCDontDeleteObjectFromWritecache(t *testing.T) { additionalShardOptions: []Option{WithDisabledGC()}, wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, }) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() obj := testutil.GenerateObjectWithSize(1024) diff --git a/pkg/local_object_storage/shard/get_test.go b/pkg/local_object_storage/shard/get_test.go index d0eecf74..837991b7 100644 --- a/pkg/local_object_storage/shard/get_test.go +++ b/pkg/local_object_storage/shard/get_test.go @@ -30,7 +30,7 @@ func TestShard_Get(t *testing.T) { func testShardGet(t *testing.T, hasWriteCache bool) { sh := newShard(t, hasWriteCache) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() var putPrm PutPrm var getPrm GetPrm diff --git a/pkg/local_object_storage/shard/head_test.go b/pkg/local_object_storage/shard/head_test.go index c65bbb1e..deb3019d 100644 --- a/pkg/local_object_storage/shard/head_test.go +++ b/pkg/local_object_storage/shard/head_test.go @@ -28,7 +28,7 @@ func TestShard_Head(t *testing.T) { func testShardHead(t *testing.T, hasWriteCache bool) { sh := newShard(t, hasWriteCache) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() var putPrm PutPrm var headPrm HeadPrm diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index e27dc073..6ccae3f5 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -1,6 +1,7 @@ package shard import ( + "context" "errors" "fmt" @@ -30,11 +31,11 @@ func (s *Shard) ID() *ID { } // UpdateID reads shard ID saved in the metabase and updates it if it is missing. -func (s *Shard) UpdateID() (err error) { +func (s *Shard) UpdateID(ctx context.Context) (err error) { var idFromMetabase []byte modeDegraded := s.GetMode().NoMetabase() if !modeDegraded { - if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil { + if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil { err = fmt.Errorf("failed to read shard id from metabase: %w", err) } } @@ -62,7 +63,7 @@ func (s *Shard) UpdateID() (err error) { } if len(idFromMetabase) == 0 && !modeDegraded { - if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil { + if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil { err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr)) } } diff --git a/pkg/local_object_storage/shard/inhume_test.go b/pkg/local_object_storage/shard/inhume_test.go index 1353d5d9..1421f0e1 100644 --- a/pkg/local_object_storage/shard/inhume_test.go +++ b/pkg/local_object_storage/shard/inhume_test.go @@ -27,7 +27,7 @@ func TestShard_Inhume(t *testing.T) { func testShardInhume(t *testing.T, hasWriteCache bool) { sh := newShard(t, hasWriteCache) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() diff --git a/pkg/local_object_storage/shard/list_test.go b/pkg/local_object_storage/shard/list_test.go index 3414dc76..139b2e31 100644 --- a/pkg/local_object_storage/shard/list_test.go +++ b/pkg/local_object_storage/shard/list_test.go @@ -18,14 +18,14 @@ func TestShard_List(t *testing.T) { t.Run("without write cache", func(t *testing.T) { t.Parallel() sh := newShard(t, false) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() testShardList(t, sh) }) t.Run("with write cache", func(t *testing.T) { t.Parallel() shWC := newShard(t, true) - defer func() { require.NoError(t, shWC.Close()) }() + defer func() { require.NoError(t, shWC.Close(context.Background())) }() testShardList(t, shWC) }) } diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 595afb60..7da8b8c2 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -62,7 +62,7 @@ func TestShard_Lock(t *testing.T) { require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() obj := testutil.GenerateObjectWithCID(cnr) @@ -148,7 +148,7 @@ func TestShard_Lock(t *testing.T) { func TestShard_IsLocked(t *testing.T) { sh := newShard(t, false) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() cnr := cidtest.ID() obj := testutil.GenerateObjectWithCID(cnr) diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index cec5a12a..5230dcad 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -201,11 +201,11 @@ func TestCounters(t *testing.T) { dir := t.TempDir() sh, mm := shardWithMetrics(t, dir) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() - sh.SetMode(mode.ReadOnly) + sh.SetMode(context.Background(), mode.ReadOnly) require.Equal(t, mode.ReadOnly, mm.mode) - sh.SetMode(mode.ReadWrite) + sh.SetMode(context.Background(), mode.ReadWrite) require.Equal(t, mode.ReadWrite, mm.mode) const objNumber = 10 diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 98b4c37b..90152897 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -20,19 +20,21 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode") // // Returns any error encountered that did not allow // setting shard mode. -func (s *Shard) SetMode(m mode.Mode) error { +func (s *Shard) SetMode(ctx context.Context, m mode.Mode) error { unlock := s.lockExclusive() defer unlock() - return s.setMode(m) + return s.setMode(ctx, m) } -func (s *Shard) setMode(m mode.Mode) error { - s.log.Info(context.Background(), logs.ShardSettingShardMode, +func (s *Shard) setMode(ctx context.Context, m mode.Mode) error { + s.log.Info(ctx, logs.ShardSettingShardMode, zap.Stringer("old_mode", s.info.Mode), zap.Stringer("new_mode", m)) - components := []interface{ SetMode(mode.Mode) error }{ + components := []interface { + SetMode(context.Context, mode.Mode) error + }{ s.metaBase, s.blobStor, } @@ -60,7 +62,7 @@ func (s *Shard) setMode(m mode.Mode) error { if !m.Disabled() { for i := range components { - if err := components[i].SetMode(m); err != nil { + if err := components[i].SetMode(ctx, m); err != nil { return err } } @@ -69,7 +71,7 @@ func (s *Shard) setMode(m mode.Mode) error { s.info.Mode = m s.metricsWriter.SetMode(s.info.Mode) - s.log.Info(context.Background(), logs.ShardShardModeSetSuccessfully, + s.log.Info(ctx, logs.ShardShardModeSetSuccessfully, zap.Stringer("mode", s.info.Mode)) return nil } diff --git a/pkg/local_object_storage/shard/range_test.go b/pkg/local_object_storage/shard/range_test.go index cc73db31..146e834c 100644 --- a/pkg/local_object_storage/shard/range_test.go +++ b/pkg/local_object_storage/shard/range_test.go @@ -94,7 +94,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) { }), }, }) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 124b72a5..10eb51a2 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -125,7 +125,7 @@ func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLi } } -func (r *rebuilder) Stop(log *logger.Logger) { +func (r *rebuilder) Stop(ctx context.Context, log *logger.Logger) { r.mtx.Lock() defer r.mtx.Unlock() @@ -138,7 +138,7 @@ func (r *rebuilder) Stop(log *logger.Logger) { r.wg.Wait() r.cancel = nil r.done = nil - log.Info(context.Background(), logs.BlobstoreRebuildStopped) + log.Info(ctx, logs.BlobstoreRebuildStopped) } var errMBIsNotAvailable = errors.New("metabase is not available") diff --git a/pkg/local_object_storage/shard/refill_test.go b/pkg/local_object_storage/shard/refill_test.go index 0025bb45..d9034326 100644 --- a/pkg/local_object_storage/shard/refill_test.go +++ b/pkg/local_object_storage/shard/refill_test.go @@ -34,7 +34,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) { additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)}, }) - defer func() { require.NoError(b, sh.Close()) }() + defer func() { require.NoError(b, sh.Close(context.Background())) }() var putPrm PutPrm @@ -61,7 +61,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) { require.NoError(b, err) } - require.NoError(b, sh.Close()) + require.NoError(b, sh.Close(context.Background())) require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path)) require.NoError(b, sh.Open(context.Background())) @@ -72,5 +72,5 @@ func benchRefillMetabase(b *testing.B, objectsCount int) { require.NoError(b, sh.Init(context.Background())) - require.NoError(b, sh.Close()) + require.NoError(b, sh.Close(context.Background())) } diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index 7dd7189b..e563f390 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -59,7 +59,7 @@ func TestShardReload(t *testing.T) { require.NoError(t, sh.Init(context.Background())) defer func() { - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) }() objects := make([]objAddr, 5) diff --git a/pkg/local_object_storage/shard/shutdown_test.go b/pkg/local_object_storage/shard/shutdown_test.go index de00eabd..b1232707 100644 --- a/pkg/local_object_storage/shard/shutdown_test.go +++ b/pkg/local_object_storage/shard/shutdown_test.go @@ -52,10 +52,10 @@ func TestWriteCacheObjectLoss(t *testing.T) { }) } require.NoError(t, errG.Wait()) - require.NoError(t, sh.Close()) + require.NoError(t, sh.Close(context.Background())) sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts}) - defer func() { require.NoError(t, sh.Close()) }() + defer func() { require.NoError(t, sh.Close(context.Background())) }() var getPrm GetPrm diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 26f47e82..92fb493e 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -117,8 +117,8 @@ func runFlushTest[Option any]( defer func() { require.NoError(t, wc.Close()) }() objects := putObjects(t, wc) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) + require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite)) + require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite)) require.NoError(t, wc.Flush(context.Background(), false, false)) @@ -131,11 +131,11 @@ func runFlushTest[Option any]( objects := putObjects(t, wc) // Blobstor is read-only, so we expect en error from `flush` here. - require.Error(t, wc.SetMode(mode.Degraded)) + require.Error(t, wc.SetMode(context.Background(), mode.Degraded)) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) - require.NoError(t, wc.SetMode(mode.Degraded)) + require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite)) + require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite)) + require.NoError(t, wc.SetMode(context.Background(), mode.Degraded)) check(t, mb, bs, objects) }) @@ -149,8 +149,8 @@ func runFlushTest[Option any]( objects := putObjects(t, wc) f.InjectFn(t, wc) - require.NoError(t, bs.SetMode(mode.ReadWrite)) - require.NoError(t, mb.SetMode(mode.ReadWrite)) + require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite)) + require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite)) require.Equal(t, uint32(0), errCount.Load()) require.Error(t, wc.Flush(context.Background(), false, false)) @@ -191,8 +191,8 @@ func newCache[Option any]( require.NoError(t, wc.Init()) // First set mode for metabase and blobstor to prevent background flushes. - require.NoError(t, mb.SetMode(mode.ReadOnly)) - require.NoError(t, bs.SetMode(mode.ReadOnly)) + require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly)) + require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly)) return wc, bs, mb } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 26658e9b..db789d99 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -23,8 +23,8 @@ type setModePrm struct { // SetMode sets write-cache mode of operation. // When shard is put in read-only mode all objects in memory are flushed to disk // and all background jobs are suspended. -func (c *cache) SetMode(m mode.Mode) error { - ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode", +func (c *cache) SetMode(ctx context.Context, m mode.Mode) error { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.SetMode", trace.WithAttributes( attribute.String("mode", m.String()), )) diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index a973df60..d07220b6 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -38,7 +38,7 @@ type Cache interface { // Returns ErrReadOnly if the Cache is currently in the read-only mode. Delete(context.Context, oid.Address) error Put(context.Context, common.PutPrm) (common.PutRes, error) - SetMode(mode.Mode) error + SetMode(context.Context, mode.Mode) error SetLogger(*logger.Logger) DumpInfo() Info Flush(context.Context, bool, bool) error diff --git a/pkg/services/control/server/detach_shards.go b/pkg/services/control/server/detach_shards.go index a4111bdd..ffd36962 100644 --- a/pkg/services/control/server/detach_shards.go +++ b/pkg/services/control/server/detach_shards.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc/status" ) -func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) { +func (s *Server) DetachShards(ctx context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) { err := s.isValidRequest(req) if err != nil { return nil, status.Error(codes.PermissionDenied, err.Error()) @@ -19,7 +19,7 @@ func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsReques shardIDs := s.getShardIDList(req.GetBody().GetShard_ID()) - if err := s.s.DetachShards(shardIDs); err != nil { + if err := s.s.DetachShards(ctx, shardIDs); err != nil { if errors.As(err, new(logicerr.Logical)) { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/services/control/server/set_shard_mode.go b/pkg/services/control/server/set_shard_mode.go index 52835c41..4f879626 100644 --- a/pkg/services/control/server/set_shard_mode.go +++ b/pkg/services/control/server/set_shard_mode.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc/status" ) -func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) { +func (s *Server) SetShardMode(ctx context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) { // verify request err := s.isValidRequest(req) if err != nil { @@ -38,7 +38,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques } for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { - err = s.s.SetShardMode(shardID, m, req.GetBody().GetResetErrorCounter()) + err = s.s.SetShardMode(ctx, shardID, m, req.GetBody().GetResetErrorCounter()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }