From a7c79c773ab72968b12ef805b1b0bc8dbee5389f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 23 Mar 2023 17:59:14 +0300 Subject: [PATCH] [#168] node: Refactor node config Resolve containedctx linter for cfg Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 20 +++++----- cmd/frostfs-node/control.go | 2 +- cmd/frostfs-node/httpcomponent.go | 6 +-- cmd/frostfs-node/main.go | 38 ++++++++++--------- cmd/frostfs-node/morph.go | 22 +++++------ cmd/frostfs-node/notificator.go | 5 ++- cmd/frostfs-node/worker.go | 8 ++-- pkg/local_object_storage/engine/control.go | 9 +++-- .../engine/control_test.go | 13 ++++--- .../engine/engine_test.go | 5 ++- pkg/local_object_storage/engine/error_test.go | 3 +- .../engine/evacuate_test.go | 3 +- pkg/local_object_storage/shard/control.go | 5 ++- .../shard/control_test.go | 15 ++++---- pkg/local_object_storage/shard/gc.go | 9 ++--- pkg/local_object_storage/shard/gc_test.go | 2 +- pkg/local_object_storage/shard/lock_test.go | 2 +- .../shard/metrics_test.go | 3 +- pkg/local_object_storage/shard/reload_test.go | 3 +- pkg/local_object_storage/shard/shard_test.go | 3 +- 20 files changed, 93 insertions(+), 83 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 09f2c20877..1575ce073f 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -303,10 +303,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { // the application life cycle. // It should not contain any read configuration values, component-specific // helpers and fields. -// nolint: containedctx type internals struct { - ctx context.Context - ctxCancel func() + done chan struct{} internalErr chan error // channel for internal application errors at runtime appCfg *config.Config @@ -570,7 +568,7 @@ func initCfg(appCfg *config.Config) *cfg { fatalOnErr(err) c.internals = internals{ - ctx: context.Background(), + done: make(chan struct{}), appCfg: appCfg, internalErr: make(chan error), log: log, @@ -940,7 +938,7 @@ type dCmp struct { reloadFunc func() error } -func (c *cfg) signalWatcher() { +func (c *cfg) signalWatcher(ctx context.Context) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) @@ -949,7 +947,7 @@ func (c *cfg) signalWatcher() { case sig := <-ch: switch sig { case syscall.SIGHUP: - c.reloadConfig() + c.reloadConfig(ctx) case syscall.SIGTERM, syscall.SIGINT: c.log.Info("termination signal has been received, stopping...") // TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING) @@ -971,7 +969,7 @@ func (c *cfg) signalWatcher() { } } -func (c *cfg) reloadConfig() { +func (c *cfg) reloadConfig(ctx context.Context) { c.log.Info("SIGHUP has been received, rereading configuration...") err := c.readConfig(c.appCfg) @@ -999,10 +997,10 @@ func (c *cfg) reloadConfig() { } else { cmp.preReload = disableMetricsSvc } - components = append(components, dCmp{cmp.name, cmp.reload}) + components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } if cmp, updated := pprofComponent(c); updated { - components = append(components, dCmp{cmp.name, cmp.reload}) + components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } // Storage Engine @@ -1012,7 +1010,7 @@ func (c *cfg) reloadConfig() { rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) } - err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg) + err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) if err != nil { c.log.Error("storage engine configuration update", zap.Error(err)) return @@ -1033,7 +1031,7 @@ func (c *cfg) reloadConfig() { func (c *cfg) shutdown() { c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) - c.ctxCancel() + c.done <- struct{}{} for i := range c.closers { c.closers[len(c.closers)-1-i].fn() } diff --git a/cmd/frostfs-node/control.go b/cmd/frostfs-node/control.go index ad6b9bbbdf..5492f585fa 100644 --- a/cmd/frostfs-node/control.go +++ b/cmd/frostfs-node/control.go @@ -65,7 +65,7 @@ func initControlService(c *cfg) { control.RegisterControlServiceServer(c.cfgControlService.server, ctlSvc) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { - runAndLog(c, "control", false, func(c *cfg) { + runAndLog(ctx, c, "control", false, func(context.Context, *cfg) { fatalOnErr(c.cfgControlService.server.Serve(lis)) }) })) diff --git a/cmd/frostfs-node/httpcomponent.go b/cmd/frostfs-node/httpcomponent.go index 9fa6d2f6a8..db9239384e 100644 --- a/cmd/frostfs-node/httpcomponent.go +++ b/cmd/frostfs-node/httpcomponent.go @@ -41,14 +41,14 @@ func (cmp *httpComponent) init(c *cfg) { c.workers = append(c.workers, worker{ cmp.name, func(ctx context.Context) { - runAndLog(c, cmp.name, false, func(c *cfg) { + runAndLog(ctx, c, cmp.name, false, func(context.Context, *cfg) { fatalOnErr(srv.Serve()) }) }, }) } -func (cmp *httpComponent) reload() error { +func (cmp *httpComponent) reload(ctx context.Context) error { if cmp.preReload != nil { cmp.preReload(cmp.cfg) } @@ -64,7 +64,7 @@ func (cmp *httpComponent) reload() error { cmp.init(cmp.cfg) // Start worker if cmp.enabled { - startWorker(cmp.cfg, *getWorker(cmp.cfg, cmp.name)) + startWorker(ctx, cmp.cfg, *getWorker(cmp.cfg, cmp.name)) } return nil } diff --git a/cmd/frostfs-node/main.go b/cmd/frostfs-node/main.go index 57d8bc8096..7768409b0f 100644 --- a/cmd/frostfs-node/main.go +++ b/cmd/frostfs-node/main.go @@ -56,15 +56,17 @@ func main() { c := initCfg(appCfg) - initApp(c) + ctx, cancel := context.WithCancel(context.Background()) + + initApp(ctx, c) c.setHealthStatus(control.HealthStatus_STARTING) - bootUp(c) + bootUp(ctx, c) c.setHealthStatus(control.HealthStatus_READY) - wait(c) + wait(c, cancel) } func initAndLog(c *cfg, name string, initializer func(*cfg)) { @@ -73,12 +75,10 @@ func initAndLog(c *cfg, name string, initializer func(*cfg)) { c.log.Info(fmt.Sprintf("%s service has been successfully initialized", name)) } -func initApp(c *cfg) { - c.ctx, c.ctxCancel = context.WithCancel(context.Background()) - +func initApp(ctx context.Context, c *cfg) { c.wg.Add(1) go func() { - c.signalWatcher() + c.signalWatcher(ctx) c.wg.Done() }() @@ -91,7 +91,7 @@ func initApp(c *cfg) { initAndLog(c, "storage engine", func(c *cfg) { fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) - fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init()) + fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx)) }) initAndLog(c, "gRPC", initGRPC) @@ -105,12 +105,12 @@ func initApp(c *cfg) { initAndLog(c, "tree", initTreeService) initAndLog(c, "control", initControlService) - initAndLog(c, "morph notifications", listenMorphNotifications) + initAndLog(c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) }) } -func runAndLog(c *cfg, name string, logSuccess bool, starter func(*cfg)) { +func runAndLog(ctx context.Context, c *cfg, name string, logSuccess bool, starter func(context.Context, *cfg)) { c.log.Info(fmt.Sprintf("starting %s service...", name)) - starter(c) + starter(ctx, c) if logSuccess { c.log.Info(fmt.Sprintf("%s service started successfully", name)) @@ -130,20 +130,22 @@ func stopAndLog(c *cfg, name string, stopper func() error) { c.log.Debug(fmt.Sprintf("%s service has been stopped", name)) } -func bootUp(c *cfg) { - runAndLog(c, "NATS", true, connectNats) - runAndLog(c, "gRPC", false, serveGRPC) - runAndLog(c, "notary", true, makeAndWaitNotaryDeposit) +func bootUp(ctx context.Context, c *cfg) { + runAndLog(ctx, c, "NATS", true, connectNats) + runAndLog(ctx, c, "gRPC", false, func(_ context.Context, c *cfg) { serveGRPC(c) }) + runAndLog(ctx, c, "notary", true, makeAndWaitNotaryDeposit) bootstrapNode(c) - startWorkers(c) + startWorkers(ctx, c) } -func wait(c *cfg) { +func wait(c *cfg, cancel func()) { c.log.Info("application started", zap.String("version", misc.Version)) - <-c.ctx.Done() // graceful shutdown + <-c.done // graceful shutdown + + cancel() c.log.Debug("waiting for all processes to stop") diff --git a/cmd/frostfs-node/morph.go b/cmd/frostfs-node/morph.go index 4ab2eacade..439de3a9e2 100644 --- a/cmd/frostfs-node/morph.go +++ b/cmd/frostfs-node/morph.go @@ -108,7 +108,7 @@ func initMorphComponents(c *cfg) { c.cfgNetmap.wrapper = wrap } -func makeAndWaitNotaryDeposit(c *cfg) { +func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) { // skip notary deposit in non-notary environments if !c.cfgMorph.notaryEnabled { return @@ -125,7 +125,7 @@ func makeAndWaitNotaryDeposit(c *cfg) { return } - err = waitNotaryDeposit(c, tx) + err = waitNotaryDeposit(ctx, c, tx) fatalOnErr(err) } @@ -154,11 +154,11 @@ var ( errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network") ) -func waitNotaryDeposit(c *cfg, tx util.Uint256) error { +func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error { for i := 0; i < notaryDepositRetriesAmount; i++ { select { - case <-c.ctx.Done(): - return c.ctx.Err() + case <-ctx.Done(): + return ctx.Err() default: } @@ -171,7 +171,7 @@ func waitNotaryDeposit(c *cfg, tx util.Uint256) error { return errNotaryDepositFail } - err = c.cfgMorph.client.Wait(c.ctx, 1) + err = c.cfgMorph.client.Wait(ctx, 1) if err != nil { return fmt.Errorf("could not wait for one block in chain: %w", err) } @@ -180,7 +180,7 @@ func waitNotaryDeposit(c *cfg, tx util.Uint256) error { return errNotaryDepositTimeout } -func listenMorphNotifications(c *cfg) { +func listenMorphNotifications(ctx context.Context, c *cfg) { // listenerPoolCap is a capacity of a // worker pool inside the listener. It // is used to prevent blocking in neo-go: @@ -200,7 +200,7 @@ func listenMorphNotifications(c *cfg) { c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) } - subs, err = subscriber.New(c.ctx, &subscriber.Params{ + subs, err = subscriber.New(ctx, &subscriber.Params{ Log: c.log, StartFromBlock: fromSideChainBlock, Client: c.cfgMorph.client, @@ -214,9 +214,9 @@ func listenMorphNotifications(c *cfg) { }) fatalOnErr(err) - c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { - runAndLog(c, "morph notification", false, func(c *cfg) { - lis.ListenWithError(ctx, c.internalErr) + c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) { + runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) { + lis.ListenWithError(lCtx, c.internalErr) }) })) diff --git a/cmd/frostfs-node/notificator.go b/cmd/frostfs-node/notificator.go index 9722db3a5c..d5cb1ded42 100644 --- a/cmd/frostfs-node/notificator.go +++ b/cmd/frostfs-node/notificator.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/hex" "fmt" @@ -155,13 +156,13 @@ func initNotifications(c *cfg) { } } -func connectNats(c *cfg) { +func connectNats(ctx context.Context, c *cfg) { if !c.cfgNotifications.enabled { return } endpoint := nodeconfig.Notification(c.appCfg).Endpoint() - err := c.cfgNotifications.nw.w.Connect(c.ctx, endpoint) + err := c.cfgNotifications.nw.w.Connect(ctx, endpoint) if err != nil { panic(fmt.Sprintf("could not connect to a nats endpoint %s: %v", endpoint, err)) } diff --git a/cmd/frostfs-node/worker.go b/cmd/frostfs-node/worker.go index 21780e04fa..bea235c481 100644 --- a/cmd/frostfs-node/worker.go +++ b/cmd/frostfs-node/worker.go @@ -15,17 +15,17 @@ func newWorkerFromFunc(fn func(ctx context.Context)) worker { } } -func startWorkers(c *cfg) { +func startWorkers(ctx context.Context, c *cfg) { for _, wrk := range c.workers { - startWorker(c, wrk) + startWorker(ctx, c, wrk) } } -func startWorker(c *cfg, wrk worker) { +func startWorker(ctx context.Context, c *cfg, wrk worker) { c.wg.Add(1) go func(w worker) { - w.fn(c.ctx) + w.fn(ctx) c.wg.Done() }(wrk) } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index c5e51a7f02..3e176dc91c 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" "path/filepath" @@ -68,7 +69,7 @@ func (e *StorageEngine) open() error { } // Init initializes all StorageEngine's components. -func (e *StorageEngine) Init() error { +func (e *StorageEngine) Init(ctx context.Context) error { e.mtx.Lock() defer e.mtx.Unlock() @@ -79,7 +80,7 @@ func (e *StorageEngine) Init() error { wg.Add(1) go func(id string, sh *shard.Shard) { defer wg.Done() - if err := sh.Init(); err != nil { + if err := sh.Init(ctx); err != nil { errCh <- shardInitError{ err: err, id: id, @@ -264,7 +265,7 @@ func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) { } // Reload reloads StorageEngine's configuration in runtime. -func (e *StorageEngine) Reload(rcfg ReConfiguration) error { +func (e *StorageEngine) Reload(ctx context.Context, rcfg ReConfiguration) error { type reloadInfo struct { sh *shard.Shard opts []shard.Option @@ -324,7 +325,7 @@ loop: err = sh.Open() if err == nil { - err = sh.Init() + err = sh.Init(ctx) } if err != nil { _ = sh.Close() diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 2f9714ce4b..f954d906a1 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" "io/fs" @@ -169,7 +170,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio err = e.Open() if err == nil { - require.Error(t, e.Init()) + require.Error(t, e.Init(context.Background())) } } @@ -180,7 +181,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio beforeReload() - require.NoError(t, e.Reload(ReConfiguration{ + require.NoError(t, e.Reload(context.Background(), ReConfiguration{ shards: map[string][]shard.Option{configID: opts}, })) @@ -273,7 +274,7 @@ func TestReload(t *testing.T) { } rcfg.AddShard(currShards[0], nil) // same path - require.NoError(t, e.Reload(rcfg)) + require.NoError(t, e.Reload(context.Background(), rcfg)) // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) @@ -286,7 +287,7 @@ func TestReload(t *testing.T) { meta.WithPath(newMeta), meta.WithEpochState(epochState{}), )}) - require.NoError(t, e.Reload(rcfg)) + require.NoError(t, e.Reload(context.Background(), rcfg)) require.Equal(t, shardNum+1, len(e.shards)) require.Equal(t, shardNum+1, len(e.shardPools)) @@ -303,7 +304,7 @@ func TestReload(t *testing.T) { rcfg.AddShard(currShards[i], nil) } - require.NoError(t, e.Reload(rcfg)) + require.NoError(t, e.Reload(context.Background(), rcfg)) // removed one require.Equal(t, shardNum-1, len(e.shards)) @@ -339,7 +340,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) - require.NoError(t, e.Init()) + require.NoError(t, e.Init(context.Background())) return e, currShards } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 5d7ce1be2f..83dbcd093c 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "fmt" "os" "path/filepath" @@ -159,7 +160,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard { )) require.NoError(t, s.Open()) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(context.Background())) return s } @@ -185,7 +186,7 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S } require.NoError(t, engine.Open()) - require.NoError(t, engine.Init()) + require.NoError(t, engine.Init(context.Background())) return engine } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 0c3b627367..8a32c8b69b 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "fmt" "os" "path/filepath" @@ -76,7 +77,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) } } require.NoError(t, e.Open()) - require.NoError(t, e.Init()) + require.NoError(t, e.Init(context.Background())) return &testEngine{ ng: e, diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index a89f639ef2..04d68d2e4c 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -1,6 +1,7 @@ package engine import ( + "context" "errors" "fmt" "os" @@ -51,7 +52,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng require.NoError(t, err) } require.NoError(t, e.Open()) - require.NoError(t, e.Init()) + require.NoError(t, e.Init(context.Background())) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 61553ac13c..d727d27a5e 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -1,6 +1,7 @@ package shard import ( + "context" "errors" "fmt" @@ -82,7 +83,7 @@ func (x *metabaseSynchronizer) Init() error { } // Init initializes all Shard's components. -func (s *Shard) Init() error { +func (s *Shard) Init(ctx context.Context) error { type initializer interface { Init() error } @@ -151,7 +152,7 @@ func (s *Shard) Init() error { }, } - s.gc.init() + s.gc.init(ctx) return nil } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index b5dafb5341..6886438e08 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -1,6 +1,7 @@ package shard import ( + "context" "io/fs" "math" "os" @@ -83,7 +84,7 @@ func TestShardOpen(t *testing.T) { sh := newShard() require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadWrite, sh.GetMode()) require.NoError(t, sh.Close()) @@ -92,7 +93,7 @@ func TestShardOpen(t *testing.T) { sh = newShard() require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.ReadOnly, sh.GetMode()) require.Error(t, sh.SetMode(mode.ReadWrite)) require.Equal(t, mode.ReadOnly, sh.GetMode()) @@ -103,7 +104,7 @@ func TestShardOpen(t *testing.T) { sh = newShard() require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) require.Equal(t, mode.DegradedReadOnly, sh.GetMode()) require.NoError(t, sh.Close()) } @@ -128,7 +129,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) obj := objecttest.Object() obj.SetType(objectSDK.TypeRegular) @@ -150,7 +151,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), WithRefillMetabase(true)) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) var getPrm GetPrm getPrm.SetAddress(addr) @@ -188,7 +189,7 @@ func TestRefillMetabase(t *testing.T) { require.NoError(t, sh.Open()) // initialize Blobstor - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) const objNum = 5 @@ -355,7 +356,7 @@ func TestRefillMetabase(t *testing.T) { require.NoError(t, sh.Open()) // initialize Blobstor - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) defer sh.Close() diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 6335145e13..c3bb841d2b 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -102,7 +102,7 @@ func defaultGCCfg() gcCfg { } } -func (gc *gc) init() { +func (gc *gc) init(ctx context.Context) { sz := 0 for _, v := range gc.mEventHandler { @@ -115,10 +115,10 @@ func (gc *gc) init() { gc.wg.Add(2) go gc.tickRemover() - go gc.listenEvents() + go gc.listenEvents(ctx) } -func (gc *gc) listenEvents() { +func (gc *gc) listenEvents(ctx context.Context) { defer gc.wg.Done() for { @@ -136,8 +136,7 @@ func (gc *gc) listenEvents() { v.cancelFunc() v.prevGroup.Wait() - var ctx context.Context - ctx, v.cancelFunc = context.WithCancel(context.Background()) + ctx, v.cancelFunc = context.WithCancel(ctx) v.prevGroup.Add(len(v.handlers)) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index a4bbefde97..245669ff1f 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -72,7 +72,7 @@ func Test_GCDropsLockedExpiredObject(t *testing.T) { sh = shard.New(opts...) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { releaseShard(sh, t) diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 0bf1c89091..4e23e8c376 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -56,7 +56,7 @@ func TestShard_Lock(t *testing.T) { sh = shard.New(opts...) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { releaseShard(sh, t) diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 4262591073..959aebf8d7 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "path/filepath" "testing" @@ -215,7 +216,7 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) { shard.WithMetricsWriter(mm), ) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) t.Cleanup(func() { sh.Close() diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index fbe4d61218..7aa331c7ff 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -1,6 +1,7 @@ package shard import ( + "context" "os" "path/filepath" "testing" @@ -51,7 +52,7 @@ func TestShardReload(t *testing.T) { sh := New(opts...) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) objects := make([]objAddr, 5) for i := range objects { diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 5a04058a69..027e6ca7ba 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -1,6 +1,7 @@ package shard_test import ( + "context" "path/filepath" "testing" @@ -80,7 +81,7 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts sh := shard.New(opts...) require.NoError(t, sh.Open()) - require.NoError(t, sh.Init()) + require.NoError(t, sh.Init(context.Background())) return sh }