diff --git a/cmd/frostfs-lens/internal/meta/root.go b/cmd/frostfs-lens/internal/meta/root.go index 9f35c26c..a59574b6 100644 --- a/cmd/frostfs-lens/internal/meta/root.go +++ b/cmd/frostfs-lens/internal/meta/root.go @@ -43,7 +43,7 @@ func openMeta(cmd *cobra.Command) *meta.DB { }), meta.WithEpochState(epochState{}), ) - common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(true))) + common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(cmd.Context(), true))) return db } diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 8e103b52..8286bc7d 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -901,7 +901,7 @@ func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } -func initLocalStorage(c *cfg) { +func initLocalStorage(ctx context.Context, c *cfg) { ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { @@ -914,7 +914,7 @@ func initLocalStorage(c *cfg) { var shardsAttached int for _, optsWithMeta := range c.shardOpts() { - id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) + id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) if err != nil { c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) } else { @@ -931,7 +931,7 @@ func initLocalStorage(c *cfg) { c.onShutdown(func() { c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine) - err := ls.Close() + err := ls.Close(context.Background()) if err != nil { c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure, zap.String("error", err.Error()), diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index bf872da0..88032ebd 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -91,10 +91,10 @@ func initApp(ctx context.Context, c *cfg) { initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) }) - initLocalStorage(c) + initLocalStorage(ctx, c) initAndLog(c, "storage engine", func(c *cfg) { - fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) + fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx)) fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx)) }) diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index 87845206..f1d567da 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -51,7 +51,7 @@ func TestCompression(t *testing.T) { bs := New( WithCompressObjects(compress), WithStorages(defaultStorages(dir, smallSizeLimit))) - require.NoError(t, bs.Open(false)) + require.NoError(t, bs.Open(context.Background(), false)) require.NoError(t, bs.Init()) return bs } @@ -126,7 +126,7 @@ func TestBlobstor_needsCompression(t *testing.T) { Storage: fstree.New(fstree.WithPath(dir)), }, })) - require.NoError(t, bs.Open(false)) + require.NoError(t, bs.Open(context.Background(), false)) require.NoError(t, bs.Init()) return bs } @@ -188,7 +188,7 @@ func TestConcurrentPut(t *testing.T) { blobStor := New( WithStorages(defaultStorages(dir, smallSizeLimit))) - require.NoError(t, blobStor.Open(false)) + require.NoError(t, blobStor.Open(context.Background(), false)) require.NoError(t, blobStor.Init()) testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { @@ -268,7 +268,7 @@ func TestConcurrentDelete(t *testing.T) { blobStor := New( WithStorages(defaultStorages(dir, smallSizeLimit))) - require.NoError(t, blobStor.Open(false)) + require.NoError(t, blobStor.Open(context.Background(), false)) require.NoError(t, blobStor.Init()) testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) { diff --git a/pkg/local_object_storage/blobstor/control.go b/pkg/local_object_storage/blobstor/control.go index 6b439dcf..4b8a36de 100644 --- a/pkg/local_object_storage/blobstor/control.go +++ b/pkg/local_object_storage/blobstor/control.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "errors" "fmt" @@ -9,10 +10,15 @@ import ( ) // Open opens BlobStor. -func (b *BlobStor) Open(readOnly bool) error { +func (b *BlobStor) Open(ctx context.Context, readOnly bool) error { b.log.Debug(logs.BlobstorOpening) for i := range b.storage { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } err := b.storage[i].Storage.Open(readOnly) if err != nil { return err diff --git a/pkg/local_object_storage/blobstor/exists_test.go b/pkg/local_object_storage/blobstor/exists_test.go index 7a3dcd61..367b63af 100644 --- a/pkg/local_object_storage/blobstor/exists_test.go +++ b/pkg/local_object_storage/blobstor/exists_test.go @@ -20,7 +20,7 @@ func TestExists(t *testing.T) { b := New(WithStorages(storages)) - require.NoError(t, b.Open(false)) + require.NoError(t, b.Open(context.Background(), false)) require.NoError(t, b.Init()) objects := []*objectSDK.Object{ diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index c3586965..ef3fda99 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -26,7 +26,7 @@ func TestIterateObjects(t *testing.T) { defer os.RemoveAll(p) // open Blobstor - require.NoError(t, blobStor.Open(false)) + require.NoError(t, blobStor.Open(context.Background(), false)) // initialize Blobstor require.NoError(t, blobStor.Init()) diff --git a/pkg/local_object_storage/blobstor/mode.go b/pkg/local_object_storage/blobstor/mode.go index e6d0edc0..2f4473bd 100644 --- a/pkg/local_object_storage/blobstor/mode.go +++ b/pkg/local_object_storage/blobstor/mode.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -21,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error { err := b.Close() if err == nil { - if err = b.Open(m.ReadOnly()); err == nil { + if err = b.Open(context.TODO(), m.ReadOnly()); err == nil { err = b.Init() } } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index bd166b3f..d97148f7 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -21,11 +21,11 @@ type shardInitError struct { } // Open opens all StorageEngine's components. -func (e *StorageEngine) Open() error { - return e.open() +func (e *StorageEngine) Open(ctx context.Context) error { + return e.open(ctx) } -func (e *StorageEngine) open() error { +func (e *StorageEngine) open(ctx context.Context) error { e.mtx.Lock() defer e.mtx.Unlock() @@ -36,7 +36,7 @@ func (e *StorageEngine) open() error { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() - if err := sh.Open(); err != nil { + if err := sh.Open(ctx); err != nil { errCh <- shardInitError{ err: err, id: id, @@ -148,10 +148,10 @@ var errClosed = errors.New("storage engine is closed") // After the call, all the next ones will fail. // // The method MUST only be called when the application exits. -func (e *StorageEngine) Close() error { +func (e *StorageEngine) Close(ctx context.Context) error { close(e.closeCh) defer e.wg.Wait() - return e.setBlockExecErr(errClosed) + return e.setBlockExecErr(ctx, errClosed) } // closes all shards. Never returns an error, shard errors are logged. @@ -197,7 +197,7 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error { // - otherwise, resumes execution. If exec was blocked, calls open method. // // Can be called concurrently with exec. In this case it waits for all executions to complete. -func (e *StorageEngine) setBlockExecErr(err error) error { +func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error { e.blockExec.mtx.Lock() defer e.blockExec.mtx.Unlock() @@ -212,7 +212,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error { if err == nil { if prevErr != nil { // block -> ok - return e.open() + return e.open(ctx) } } else if prevErr == nil { // ok -> block return e.close(errors.Is(err, errClosed)) @@ -235,7 +235,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error { // Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution // for this. func (e *StorageEngine) BlockExecution(err error) error { - return e.setBlockExecErr(err) + return e.setBlockExecErr(context.Background(), err) } // ResumeExecution resumes the execution of any data-related operation. @@ -247,7 +247,7 @@ func (e *StorageEngine) BlockExecution(err error) error { // // Must not be called concurrently with either Open or Init. func (e *StorageEngine) ResumeExecution() error { - return e.setBlockExecErr(nil) + return e.setBlockExecErr(context.Background(), nil) } type ReConfiguration struct { @@ -334,14 +334,14 @@ loop: } for _, newID := range shardsToAdd { - sh, err := e.createShard(rcfg.shards[newID]) + sh, err := e.createShard(ctx, rcfg.shards[newID]) if err != nil { return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err) } idStr := sh.ID().String() - err = sh.Open() + err = sh.Open(ctx) if err == nil { err = sh.Init(ctx) } diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index eb601122..0c5ff94d 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -126,7 +126,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio var configID string e := New() - _, err := e.AddShard(opts...) + _, err := e.AddShard(context.Background(), opts...) if errOnAdd { require.Error(t, err) // This branch is only taken when we cannot update shard ID in the metabase. @@ -144,7 +144,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio configID = calculateShardID(e.shards[id].Shard.DumpInfo()) e.mtx.RUnlock() - err = e.Open() + err = e.Open(context.Background()) if err == nil { require.Error(t, e.Init(context.Background())) } @@ -193,7 +193,7 @@ func TestExecBlocks(t *testing.T) { require.NoError(t, err) // close - require.NoError(t, e.Close()) + require.NoError(t, e.Close(context.Background())) // try exec after close _, err = Head(context.Background(), e, addr) @@ -209,13 +209,13 @@ func TestPersistentShardID(t *testing.T) { te := newEngineWithErrorThreshold(t, dir, 1) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) - require.NoError(t, te.ng.Close()) + require.NoError(t, te.ng.Close(context.Background())) newTe := newEngineWithErrorThreshold(t, dir, 1) for i := 0; i < len(newTe.shards); i++ { require.Equal(t, te.shards[i].id, newTe.shards[i].id) } - require.NoError(t, newTe.ng.Close()) + require.NoError(t, newTe.ng.Close(context.Background())) p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path @@ -227,7 +227,7 @@ func TestPersistentShardID(t *testing.T) { newTe = newEngineWithErrorThreshold(t, dir, 1) require.Equal(t, te.shards[1].id, newTe.shards[0].id) require.Equal(t, te.shards[0].id, newTe.shards[1].id) - require.NoError(t, newTe.ng.Close()) + require.NoError(t, newTe.ng.Close(context.Background())) } @@ -313,7 +313,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str require.Equal(t, num, len(e.shards)) require.Equal(t, num, len(e.shardPools)) - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) return e, currShards diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 57b9ec2e..8e94732a 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -54,7 +54,7 @@ func TestDeleteBigObject(t *testing.T) { e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine e.log = test.NewLogger(t, true) - defer e.Close() + defer e.Close(context.Background()) for i := range children { require.NoError(t, Put(context.Background(), e, children[i])) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index a8fd9eee..934bea8b 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -50,7 +50,7 @@ func benchmarkExists(b *testing.B, shardNum int) { e := testNewEngine(b).setInitializedShards(b, shards...).engine b.Cleanup(func() { - _ = e.Close() + _ = e.Close(context.Background()) _ = os.RemoveAll(b.Name()) }) @@ -119,7 +119,7 @@ func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrap func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper { for i := 0; i < num; i++ { opts := shardOpts(i) - id, err := te.engine.AddShard(opts...) + id, err := te.engine.AddShard(context.Background(), opts...) require.NoError(t, err) te.shardIDs = append(te.shardIDs, id) } @@ -130,7 +130,7 @@ func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, s for i := 0; i < num; i++ { defaultOpts := testDefaultShardOptions(t, i) opts := append(defaultOpts, shardOpts(i)...) - id, err := te.engine.AddShard(opts...) + id, err := te.engine.AddShard(context.Background(), opts...) require.NoError(t, err) te.shardIDs = append(te.shardIDs, id) } @@ -190,7 +190,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard { shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...) s := shard.New(shardOpts...) - require.NoError(t, s.Open()) + require.NoError(t, s.Open(context.Background())) require.NoError(t, s.Init(context.Background())) return s diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 75091888..0a48f818 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -68,7 +68,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) } }) e := te.engine - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) for i, id := range te.shardIDs { @@ -192,7 +192,7 @@ func TestBlobstorFailback(t *testing.T) { } checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) - require.NoError(t, te.ng.Close()) + require.NoError(t, te.ng.Close(context.Background())) p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 6030ba49..c0c05d66 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng } }) e, ids := te.engine, te.shardIDs - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) diff --git a/pkg/local_object_storage/engine/head_test.go b/pkg/local_object_storage/engine/head_test.go index e212ffa3..5c123d61 100644 --- a/pkg/local_object_storage/engine/head_test.go +++ b/pkg/local_object_storage/engine/head_test.go @@ -43,7 +43,7 @@ func TestHeadRaw(t *testing.T) { s2 := testNewShard(t, 2) e := testNewEngine(t).setInitializedShards(t, s1, s2).engine - defer e.Close() + defer e.Close(context.Background()) var putPrmLeft shard.PutPrm putPrmLeft.SetObject(child) diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index a5f24207..8fff6280 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -37,7 +37,7 @@ func TestStorageEngine_Inhume(t *testing.T) { t.Run("delete small object", func(t *testing.T) { e := testNewEngine(t).setShardsNum(t, 1).engine - defer e.Close() + defer e.Close(context.Background()) err := Put(context.Background(), e, parent) require.NoError(t, err) @@ -58,7 +58,7 @@ func TestStorageEngine_Inhume(t *testing.T) { s2 := testNewShard(t, 2) e := testNewEngine(t).setInitializedShards(t, s1, s2).engine - defer e.Close() + defer e.Close(context.Background()) var putChild shard.PutPrm putChild.SetObject(child) diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 8234c3c2..eef25d20 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -76,11 +76,11 @@ func TestListWithCursor(t *testing.T) { meta.WithEpochState(epochState{}), )} }).engine - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) t.Cleanup(func() { - e.Close() + e.Close(context.Background()) }) expected := make([]object.AddressWithType, 0, tt.objectNum) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index 906a867e..d5c08022 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -59,11 +59,11 @@ func TestLockUserScenario(t *testing.T) { } }) e := testEngine.engine - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) t.Cleanup(func() { - _ = e.Close() + _ = e.Close(context.Background()) }) lockerID := oidtest.ID() @@ -167,11 +167,11 @@ func TestLockExpiration(t *testing.T) { } }) e := testEngine.engine - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) t.Cleanup(func() { - _ = e.Close() + _ = e.Close(context.Background()) }) const lockerExpiresAfter = 13 @@ -247,10 +247,10 @@ func TestLockForceRemoval(t *testing.T) { shard.WithDeletedLockCallback(e.processDeletedLocks), } }).engine - require.NoError(t, e.Open()) + require.NoError(t, e.Open(context.Background())) require.NoError(t, e.Init(context.Background())) t.Cleanup(func() { - _ = e.Close() + _ = e.Close(context.Background()) }) cnr := cidtest.ID() diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index f12a63e9..4b9d8752 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -1,6 +1,7 @@ package engine import ( + "context" "fmt" "sync/atomic" @@ -77,8 +78,8 @@ func (m *metricsWithID) DeleteShardMetrics() { // // Returns any error encountered that did not allow adding a shard. // Otherwise returns the ID of the added shard. -func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { - sh, err := e.createShard(opts) +func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) { + sh, err := e.createShard(ctx, opts) if err != nil { return nil, fmt.Errorf("could not create a shard: %w", err) } @@ -95,7 +96,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return sh.ID(), nil } -func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { +func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) { id, err := generateShardID() if err != nil { return nil, fmt.Errorf("could not generate shard ID: %w", err) @@ -111,7 +112,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { shard.WithReportErrorFunc(e.reportShardErrorBackground), )...) - if err := sh.UpdateID(); err != nil { + if err := sh.UpdateID(ctx); err != nil { return nil, fmt.Errorf("could not update shard ID: %w", err) } diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go index 3631e33e..e13017e2 100644 --- a/pkg/local_object_storage/engine/shards_test.go +++ b/pkg/local_object_storage/engine/shards_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -12,7 +13,7 @@ func TestRemoveShard(t *testing.T) { te := testNewEngine(t).setShardsNum(t, numOfShards) e, ids := te.engine, te.shardIDs t.Cleanup(func() { - e.Close() + e.Close(context.Background()) }) require.Equal(t, numOfShards, len(e.shardPools)) diff --git a/pkg/local_object_storage/internal/storagetest/storage.go b/pkg/local_object_storage/internal/storagetest/storage.go index 74400a98..ec60a2d0 100644 --- a/pkg/local_object_storage/internal/storagetest/storage.go +++ b/pkg/local_object_storage/internal/storagetest/storage.go @@ -1,6 +1,7 @@ package storagetest import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -9,7 +10,7 @@ import ( // Component represents single storage component. type Component interface { - Open(bool) error + Open(context.Context, bool) error SetMode(mode.Mode) error Init() error Close() error @@ -57,18 +58,18 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) { t.Run("RW", func(t *testing.T) { // Use-case: irrecoverable error on some components, close everything. s := cons(t) - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.Close()) }) t.Run("RO", func(t *testing.T) { // Use-case: irrecoverable error on some components, close everything. // Open in read-only must be done after the db is here. s := cons(t) - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.Init()) require.NoError(t, s.Close()) - require.NoError(t, s.Open(true)) + require.NoError(t, s.Open(context.Background(), true)) require.NoError(t, s.Close()) }) } @@ -77,7 +78,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) { func TestCloseTwice(t *testing.T, cons Constructor) { // Use-case: move to maintenance mode twice, first time failed. s := cons(t) - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.Init()) require.NoError(t, s.Close()) require.NoError(t, s.Close()) // already closed, no-op @@ -89,12 +90,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { // Use-case: metabase `Init` failed, // call `SetMode` on all not-yet-initialized components. s := cons(t) - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.SetMode(m)) t.Run("after open in RO", func(t *testing.T) { require.NoError(t, s.Close()) - require.NoError(t, s.Open(true)) + require.NoError(t, s.Open(context.Background(), true)) require.NoError(t, s.SetMode(m)) }) @@ -103,7 +104,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { t.Run("after init", func(t *testing.T) { s := cons(t) // Use-case: notmal node operation. - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.Init()) require.NoError(t, s.SetMode(m)) require.NoError(t, s.Close()) @@ -113,7 +114,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) { // Use-case: normal node operation. s := cons(t) - require.NoError(t, s.Open(false)) + require.NoError(t, s.Open(context.Background(), false)) require.NoError(t, s.Init()) require.NoError(t, s.SetMode(from)) require.NoError(t, s.SetMode(to)) diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index d0a9c472..3f155eeb 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -1,6 +1,7 @@ package meta import ( + "context" "errors" "fmt" "path/filepath" @@ -21,7 +22,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode") var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode") // Open boltDB instance for metabase. -func (db *DB) Open(readOnly bool) error { +func (db *DB) Open(_ context.Context, readOnly bool) error { err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) if err != nil { return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index 6169dd81..bc5015b6 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -1,6 +1,7 @@ package meta_test import ( + "context" "os" "path/filepath" "strconv" @@ -49,7 +50,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB { }, opts...)..., ) - require.NoError(t, bdb.Open(false)) + require.NoError(t, bdb.Open(context.Background(), false)) require.NoError(t, bdb.Init()) t.Cleanup(func() { diff --git a/pkg/local_object_storage/metabase/mode.go b/pkg/local_object_storage/metabase/mode.go index 28beca8f..a18095f3 100644 --- a/pkg/local_object_storage/metabase/mode.go +++ b/pkg/local_object_storage/metabase/mode.go @@ -1,6 +1,7 @@ package meta import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -27,9 +28,9 @@ func (db *DB) SetMode(m mode.Mode) error { case m.NoMetabase(): db.boltDB = nil case m.ReadOnly(): - err = db.Open(true) + err = db.Open(context.TODO(), true) default: - err = db.Open(false) + err = db.Open(context.TODO(), false) } if err == nil && !m.NoMetabase() && !m.ReadOnly() { err = db.Init() diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index 70ded67a..6f011c24 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -1,6 +1,7 @@ package meta import ( + "context" "encoding/binary" "errors" "fmt" @@ -42,13 +43,13 @@ func TestVersion(t *testing.T) { } t.Run("simple", func(t *testing.T) { db := newDB(t) - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.Init()) check(t, db) require.NoError(t, db.Close()) t.Run("reopen", func(t *testing.T) { - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.Init()) check(t, db) require.NoError(t, db.Close()) @@ -56,29 +57,29 @@ func TestVersion(t *testing.T) { }) t.Run("old data", func(t *testing.T) { db := newDB(t) - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4})) require.NoError(t, db.Close()) - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.Init()) check(t, db) require.NoError(t, db.Close()) }) t.Run("invalid version", func(t *testing.T) { db := newDB(t) - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, version+1) })) require.NoError(t, db.Close()) - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.Error(t, db.Init()) require.NoError(t, db.Close()) t.Run("reset", func(t *testing.T) { - require.NoError(t, db.Open(false)) + require.NoError(t, db.Open(context.Background(), false)) require.NoError(t, db.Reset()) check(t, db) require.NoError(t, db.Close()) diff --git a/pkg/local_object_storage/pilorama/bench_test.go b/pkg/local_object_storage/pilorama/bench_test.go index e729b9ea..3d5ff1a7 100644 --- a/pkg/local_object_storage/pilorama/bench_test.go +++ b/pkg/local_object_storage/pilorama/bench_test.go @@ -26,7 +26,7 @@ func BenchmarkCreate(b *testing.B) { f := NewBoltForest( WithPath(filepath.Join(tmpDir, "test.db")), WithMaxBatchSize(runtime.GOMAXPROCS(0))) - require.NoError(b, f.Open(false)) + require.NoError(b, f.Open(context.Background(), false)) require.NoError(b, f.Init()) b.Cleanup(func() { require.NoError(b, f.Close()) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index c9a9f039..2689e345 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -99,7 +99,7 @@ func (t *boltForest) SetMode(m mode.Mode) error { err := t.Close() if err == nil && !m.NoMetabase() { - if err = t.Open(m.ReadOnly()); err == nil { + if err = t.Open(context.TODO(), m.ReadOnly()); err == nil { err = t.Init() } } @@ -111,7 +111,7 @@ func (t *boltForest) SetMode(m mode.Mode) error { t.metrics.SetMode(m) return nil } -func (t *boltForest) Open(readOnly bool) error { +func (t *boltForest) Open(_ context.Context, readOnly bool) error { err := util.MkdirAllX(filepath.Dir(t.path), t.perm) if err != nil { return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err)) diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 8fb51912..e5612d2b 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -110,7 +110,7 @@ func (f *memoryForest) Init() error { return nil } -func (f *memoryForest) Open(bool) error { +func (f *memoryForest) Open(context.Context, bool) error { return nil } func (f *memoryForest) SetMode(mode.Mode) error { diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 6f657873..8e7fec20 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -24,7 +24,7 @@ var providers = []struct { }{ {"inmemory", func(t testing.TB, _ ...Option) Forest { f := NewMemoryForest() - require.NoError(t, f.Open(false)) + require.NoError(t, f.Open(context.Background(), false)) require.NoError(t, f.Init()) t.Cleanup(func() { require.NoError(t, f.Close()) @@ -37,7 +37,7 @@ var providers = []struct { append([]Option{ WithPath(filepath.Join(t.TempDir(), "test.db")), WithMaxBatchSize(1)}, opts...)...) - require.NoError(t, f.Open(false)) + require.NoError(t, f.Open(context.Background(), false)) require.NoError(t, f.Init()) t.Cleanup(func() { require.NoError(t, f.Close()) diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index ea171a47..e7f7eb51 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -58,7 +58,7 @@ type ForestStorage interface { // DumpInfo returns information about the pilorama. DumpInfo() Info Init() error - Open(bool) error + Open(context.Context, bool) error Close() error SetMode(m mode.Mode) error SetParentID(id string) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index a49a7904..257498b3 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -41,8 +41,10 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error { } // Open opens all Shard's components. -func (s *Shard) Open() error { - components := []interface{ Open(bool) error }{ +func (s *Shard) Open(ctx context.Context) error { + components := []interface { + Open(context.Context, bool) error + }{ s.blobStor, s.metaBase, } @@ -55,12 +57,12 @@ func (s *Shard) Open() error { } for i, component := range components { - if err := component.Open(false); err != nil { + if err := component.Open(ctx, false); err != nil { if component == s.metaBase { // We must first open all other components to avoid // opening non-existent DB in read-only mode. for j := i + 1; j < len(components); j++ { - if err := components[j].Open(false); err != nil { + if err := components[j].Open(ctx, false); err != nil { // Other components must be opened, fail. return fmt.Errorf("could not open %T: %w", components[j], err) } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 128e5cc0..749229cc 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -87,7 +87,7 @@ func TestShardOpen(t *testing.T) { allowedMode.Store(int64(os.O_RDWR)) sh := newShard() - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadWrite, sh.GetMode()) require.NoError(t, sh.Close()) @@ -96,7 +96,7 @@ func TestShardOpen(t *testing.T) { allowedMode.Store(int64(os.O_RDONLY)) sh = newShard() - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadOnly, sh.GetMode()) require.Error(t, sh.SetMode(mode.ReadWrite)) @@ -107,7 +107,7 @@ func TestShardOpen(t *testing.T) { allowedMode.Store(math.MaxInt64) sh = newShard() - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) require.NoError(t, sh.Close()) @@ -135,7 +135,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) obj := objecttest.Object() @@ -158,7 +158,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), WithRefillMetabase(true)) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) var getPrm GetPrm @@ -197,7 +197,7 @@ func TestRefillMetabase(t *testing.T) { ) // open Blobstor - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) // initialize Blobstor require.NoError(t, sh.Init(context.Background())) @@ -365,7 +365,7 @@ func TestRefillMetabase(t *testing.T) { ) // open Blobstor - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) // initialize Blobstor require.NoError(t, sh.Init(context.Background())) diff --git a/pkg/local_object_storage/shard/gc_internal_test.go b/pkg/local_object_storage/shard/gc_internal_test.go index c8925e01..332cdf5b 100644 --- a/pkg/local_object_storage/shard/gc_internal_test.go +++ b/pkg/local_object_storage/shard/gc_internal_test.go @@ -76,7 +76,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) { sh = New(opts...) sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} } - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 49f9a304..b0d95e54 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -1,6 +1,8 @@ package shard import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/mr-tron/base58" "go.uber.org/zap" @@ -27,8 +29,8 @@ func (s *Shard) ID() *ID { } // UpdateID reads shard ID saved in the metabase and updates it if it is missing. -func (s *Shard) UpdateID() (err error) { - if err = s.metaBase.Open(false); err != nil { +func (s *Shard) UpdateID(ctx context.Context) (err error) { + if err = s.metaBase.Open(ctx, false); err != nil { return err } defer func() { diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index be9a0ba9..ca6b0ca3 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -58,7 +58,7 @@ func TestShard_Lock(t *testing.T) { } sh = New(opts...) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index ae993c79..23721af6 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -269,7 +269,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) { meta.WithEpochState(epochState{})), WithMetricsWriter(mm), ) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index c89c0cb7..9cfa267e 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -52,7 +52,7 @@ func TestShardReload(t *testing.T) { pilorama.WithPath(filepath.Join(p, "pilorama")))} sh := New(opts...) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) objects := make([]objAddr, 5) diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 5dcccd9c..9da9eb6b 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -117,7 +117,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard sh = New(opts...) - require.NoError(t, sh.Open()) + require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) if !o.dontRelease { diff --git a/pkg/local_object_storage/writecache/benchmark/writecache_test.go b/pkg/local_object_storage/writecache/benchmark/writecache_test.go index 6ae04a92..16c6d73b 100644 --- a/pkg/local_object_storage/writecache/benchmark/writecache_test.go +++ b/pkg/local_object_storage/writecache/benchmark/writecache_test.go @@ -82,7 +82,7 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) { } func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) { - require.NoError(b, cache.Open(false), "opening") + require.NoError(b, cache.Open(context.Background(), false), "opening") require.NoError(b, cache.Init(), "initializing") b.Cleanup(func() { require.NoError(b, cache.Close(), "closing") diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 8c863735..c2fdc100 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -38,7 +38,7 @@ type Cache interface { Flush(context.Context, bool) error Init() error - Open(readOnly bool) error + Open(ctx context.Context, readOnly bool) error Close() error } diff --git a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go index 28aa7e76..6dd4755b 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go +++ b/pkg/local_object_storage/writecache/writecachebadger/cachebadger.go @@ -1,6 +1,7 @@ package writecachebadger import ( + "context" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -83,7 +84,7 @@ func (c *cache) DumpInfo() writecache.Info { } // Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(readOnly bool) error { +func (c *cache) Open(_ context.Context, readOnly bool) error { err := c.openStore(readOnly) if err != nil { return metaerr.Wrap(err) diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go index 407d1a9c..11bdbe07 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -1,6 +1,7 @@ package writecachebbolt import ( + "context" "os" "sync" @@ -97,7 +98,7 @@ func (c *cache) DumpInfo() writecache.Info { } // Open opens and initializes database. Reads object counters from the ObjectCounters instance. -func (c *cache) Open(readOnly bool) error { +func (c *cache) Open(_ context.Context, readOnly bool) error { err := c.openStore(readOnly) if err != nil { return metaerr.Wrap(err) diff --git a/pkg/local_object_storage/writecache/writecachetest/flush.go b/pkg/local_object_storage/writecache/writecachetest/flush.go index 05b4b23a..6911344a 100644 --- a/pkg/local_object_storage/writecache/writecachetest/flush.go +++ b/pkg/local_object_storage/writecache/writecachetest/flush.go @@ -110,7 +110,7 @@ func newCache[Option any]( mb := meta.New( meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(dummyEpoch{})) - require.NoError(t, mb.Open(false)) + require.NoError(t, mb.Open(context.Background(), false)) require.NoError(t, mb.Init()) bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{ @@ -121,12 +121,12 @@ func newCache[Option any]( fstree.WithDirNameLen(1)), }, })) - require.NoError(t, bs.Open(false)) + require.NoError(t, bs.Open(context.Background(), false)) require.NoError(t, bs.Init()) wc := createCacheFn(t, smallSize, mb, bs, opts...) t.Cleanup(func() { require.NoError(t, wc.Close()) }) - require.NoError(t, wc.Open(false)) + require.NoError(t, wc.Open(context.Background(), false)) require.NoError(t, wc.Init()) // First set mode for metabase and blobstor to prevent background flushes.