[#653] Pass context.Context to StorageEngine Open #672
43 changed files with 129 additions and 109 deletions
|
@ -43,7 +43,7 @@ func openMeta(cmd *cobra.Command) *meta.DB {
|
|||
}),
|
||||
meta.WithEpochState(epochState{}),
|
||||
)
|
||||
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(true)))
|
||||
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(cmd.Context(), true)))
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
return db
|
||||
}
|
||||
|
|
|
@ -901,7 +901,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
|||
return c.localAddr
|
||||
}
|
||||
|
||||
func initLocalStorage(c *cfg) {
|
||||
func initLocalStorage(ctx context.Context, c *cfg) {
|
||||
ls := engine.New(c.engineOpts()...)
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||
|
@ -914,7 +914,7 @@ func initLocalStorage(c *cfg) {
|
|||
|
||||
var shardsAttached int
|
||||
for _, optsWithMeta := range c.shardOpts() {
|
||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||
} else {
|
||||
|
@ -931,7 +931,7 @@ func initLocalStorage(c *cfg) {
|
|||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
||||
|
||||
err := ls.Close()
|
||||
err := ls.Close(context.Background())
|
||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Need to check: if Need to check: if `onShutdown` calls afetr ctx is cancelled, then `Close()` may fail. So `context.Background()` or detached https://pkg.go.dev/context#WithoutCancel should be used.
fyrchik
commented
`WithoutCancel()` was added in 1.21, but we need to support 1.20 too.
elebedeva
commented
changed changed `ls.Close(ctx)` to `ls.Close(context.Background())`
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -91,10 +91,10 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
|
||||
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
|
||||
|
||||
initLocalStorage(c)
|
||||
initLocalStorage(ctx, c)
|
||||
|
||||
initAndLog(c, "storage engine", func(c *cfg) {
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx))
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
||||
})
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ func TestCompression(t *testing.T) {
|
|||
bs := New(
|
||||
WithCompressObjects(compress),
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
return bs
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
Storage: fstree.New(fstree.WithPath(dir)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
return bs
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func TestConcurrentPut(t *testing.T) {
|
|||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
|
@ -268,7 +268,7 @@ func TestConcurrentDelete(t *testing.T) {
|
|||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -9,10 +10,15 @@ import (
|
|||
)
|
||||
|
||||
// Open opens BlobStor.
|
||||
func (b *BlobStor) Open(readOnly bool) error {
|
||||
func (b *BlobStor) Open(ctx context.Context, readOnly bool) error {
|
||||
b.log.Debug(logs.BlobstorOpening)
|
||||
|
||||
for i := range b.storage {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
err := b.storage[i].Storage.Open(readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestExists(t *testing.T) {
|
|||
|
||||
b := New(WithStorages(storages))
|
||||
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Open(context.Background(), false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
objects := []*objectSDK.Object{
|
||||
|
|
|
@ -26,7 +26,7 @@ func TestIterateObjects(t *testing.T) {
|
|||
defer os.RemoveAll(p)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -21,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
|
|||
|
||||
err := b.Close()
|
||||
if err == nil {
|
||||
if err = b.Open(m.ReadOnly()); err == nil {
|
||||
if err = b.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
context.TODO is more preferred in such cases (or we should pass ctx to SetMode too :) ) context.TODO is more preferred in such cases (or we should pass ctx to SetMode too :) )
elebedeva
commented
fixed fixed
|
||||
err = b.Init()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,11 +21,11 @@ type shardInitError struct {
|
|||
}
|
||||
|
||||
// Open opens all StorageEngine's components.
|
||||
func (e *StorageEngine) Open() error {
|
||||
return e.open()
|
||||
func (e *StorageEngine) Open(ctx context.Context) error {
|
||||
return e.open(ctx)
|
||||
}
|
||||
|
||||
func (e *StorageEngine) open() error {
|
||||
func (e *StorageEngine) open(ctx context.Context) error {
|
||||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
|
@ -36,7 +36,7 @@ func (e *StorageEngine) open() error {
|
|||
wg.Add(1)
|
||||
go func(id string, sh *shard.Shard) {
|
||||
defer wg.Done()
|
||||
if err := sh.Open(); err != nil {
|
||||
if err := sh.Open(ctx); err != nil {
|
||||
errCh <- shardInitError{
|
||||
err: err,
|
||||
id: id,
|
||||
|
@ -148,10 +148,10 @@ var errClosed = errors.New("storage engine is closed")
|
|||
// After the call, all the next ones will fail.
|
||||
//
|
||||
// The method MUST only be called when the application exits.
|
||||
func (e *StorageEngine) Close() error {
|
||||
func (e *StorageEngine) Close(ctx context.Context) error {
|
||||
close(e.closeCh)
|
||||
defer e.wg.Wait()
|
||||
return e.setBlockExecErr(errClosed)
|
||||
return e.setBlockExecErr(ctx, errClosed)
|
||||
}
|
||||
|
||||
// closes all shards. Never returns an error, shard errors are logged.
|
||||
|
@ -197,7 +197,7 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error {
|
|||
// - otherwise, resumes execution. If exec was blocked, calls open method.
|
||||
//
|
||||
// Can be called concurrently with exec. In this case it waits for all executions to complete.
|
||||
func (e *StorageEngine) setBlockExecErr(err error) error {
|
||||
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
||||
e.blockExec.mtx.Lock()
|
||||
defer e.blockExec.mtx.Unlock()
|
||||
|
||||
|
@ -212,7 +212,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
|||
|
||||
if err == nil {
|
||||
if prevErr != nil { // block -> ok
|
||||
return e.open()
|
||||
return e.open(ctx)
|
||||
}
|
||||
} else if prevErr == nil { // ok -> block
|
||||
return e.close(errors.Is(err, errClosed))
|
||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
|||
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
|
||||
// for this.
|
||||
func (e *StorageEngine) BlockExecution(err error) error {
|
||||
return e.setBlockExecErr(err)
|
||||
return e.setBlockExecErr(context.Background(), err)
|
||||
}
|
||||
|
||||
// ResumeExecution resumes the execution of any data-related operation.
|
||||
|
@ -247,7 +247,7 @@ func (e *StorageEngine) BlockExecution(err error) error {
|
|||
//
|
||||
// Must not be called concurrently with either Open or Init.
|
||||
func (e *StorageEngine) ResumeExecution() error {
|
||||
return e.setBlockExecErr(nil)
|
||||
return e.setBlockExecErr(context.Background(), nil)
|
||||
}
|
||||
|
||||
type ReConfiguration struct {
|
||||
|
@ -334,14 +334,14 @@ loop:
|
|||
}
|
||||
|
||||
for _, newID := range shardsToAdd {
|
||||
sh, err := e.createShard(rcfg.shards[newID])
|
||||
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
||||
}
|
||||
|
||||
idStr := sh.ID().String()
|
||||
|
||||
err = sh.Open()
|
||||
err = sh.Open(ctx)
|
||||
if err == nil {
|
||||
err = sh.Init(ctx)
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
var configID string
|
||||
|
||||
e := New()
|
||||
_, err := e.AddShard(opts...)
|
||||
_, err := e.AddShard(context.Background(), opts...)
|
||||
if errOnAdd {
|
||||
require.Error(t, err)
|
||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||
|
@ -144,7 +144,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||
e.mtx.RUnlock()
|
||||
|
||||
err = e.Open()
|
||||
err = e.Open(context.Background())
|
||||
if err == nil {
|
||||
require.Error(t, e.Init(context.Background()))
|
||||
}
|
||||
|
@ -193,7 +193,7 @@ func TestExecBlocks(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// close
|
||||
require.NoError(t, e.Close())
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
|
||||
// try exec after close
|
||||
_, err = Head(context.Background(), e, addr)
|
||||
|
@ -209,13 +209,13 @@ func TestPersistentShardID(t *testing.T) {
|
|||
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
|
||||
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
||||
for i := 0; i < len(newTe.shards); i++ {
|
||||
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
||||
}
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||
|
||||
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
|
@ -227,7 +227,7 @@ func TestPersistentShardID(t *testing.T) {
|
|||
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
||||
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
|||
require.Equal(t, num, len(e.shards))
|
||||
require.Equal(t, num, len(e.shardPools))
|
||||
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
return e, currShards
|
||||
|
|
|
@ -54,7 +54,7 @@ func TestDeleteBigObject(t *testing.T) {
|
|||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||
e.log = test.NewLogger(t, true)
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
for i := range children {
|
||||
require.NoError(t, Put(context.Background(), e, children[i]))
|
||||
|
|
|
@ -50,7 +50,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
|
||||
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
||||
b.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
_ = os.RemoveAll(b.Name())
|
||||
})
|
||||
|
||||
|
@ -119,7 +119,7 @@ func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrap
|
|||
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||
for i := 0; i < num; i++ {
|
||||
opts := shardOpts(i)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, s
|
|||
for i := 0; i < num; i++ {
|
||||
defaultOpts := testDefaultShardOptions(t, i)
|
||||
opts := append(defaultOpts, shardOpts(i)...)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
|
|||
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
||||
s := shard.New(shardOpts...)
|
||||
|
||||
require.NoError(t, s.Open())
|
||||
require.NoError(t, s.Open(context.Background()))
|
||||
require.NoError(t, s.Init(context.Background()))
|
||||
|
||||
return s
|
||||
|
|
|
@ -68,7 +68,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
}
|
||||
})
|
||||
e := te.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
for i, id := range te.shardIDs {
|
||||
|
@ -192,7 +192,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
}
|
||||
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
|
||||
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
|
|
|
@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
|||
}
|
||||
})
|
||||
e, ids := te.engine, te.shardIDs
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||
|
|
|
@ -43,7 +43,7 @@ func TestHeadRaw(t *testing.T) {
|
|||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
var putPrmLeft shard.PutPrm
|
||||
putPrmLeft.SetObject(child)
|
||||
|
|
|
@ -37,7 +37,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
|
||||
t.Run("delete small object", func(t *testing.T) {
|
||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
err := Put(context.Background(), e, parent)
|
||||
require.NoError(t, err)
|
||||
|
@ -58,7 +58,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
var putChild shard.PutPrm
|
||||
putChild.SetObject(child)
|
||||
|
|
|
@ -76,11 +76,11 @@ func TestListWithCursor(t *testing.T) {
|
|||
meta.WithEpochState(epochState{}),
|
||||
)}
|
||||
}).engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
e.Close(context.Background())
|
||||
})
|
||||
|
||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||
|
|
|
@ -59,11 +59,11 @@ func TestLockUserScenario(t *testing.T) {
|
|||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
lockerID := oidtest.ID()
|
||||
|
@ -167,11 +167,11 @@ func TestLockExpiration(t *testing.T) {
|
|||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
const lockerExpiresAfter = 13
|
||||
|
@ -247,10 +247,10 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
}
|
||||
}).engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -77,8 +78,8 @@ func (m *metricsWithID) DeleteShardMetrics() {
|
|||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
// Otherwise returns the ID of the added shard.
|
||||
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
||||
sh, err := e.createShard(opts)
|
||||
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||
sh, err := e.createShard(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
||||
}
|
||||
|
@ -95,7 +96,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
return sh.ID(), nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
|
@ -111,7 +112,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
|||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -12,7 +13,7 @@ func TestRemoveShard(t *testing.T) {
|
|||
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||
e, ids := te.engine, te.shardIDs
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
e.Close(context.Background())
|
||||
})
|
||||
|
||||
require.Equal(t, numOfShards, len(e.shardPools))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storagetest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -9,7 +10,7 @@ import (
|
|||
|
||||
// Component represents single storage component.
|
||||
type Component interface {
|
||||
Open(bool) error
|
||||
Open(context.Context, bool) error
|
||||
SetMode(mode.Mode) error
|
||||
Init() error
|
||||
Close() error
|
||||
|
@ -57,18 +58,18 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
|||
t.Run("RW", func(t *testing.T) {
|
||||
// Use-case: irrecoverable error on some components, close everything.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
t.Run("RO", func(t *testing.T) {
|
||||
// Use-case: irrecoverable error on some components, close everything.
|
||||
// Open in read-only must be done after the db is here.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
require.NoError(t, s.Open(true))
|
||||
require.NoError(t, s.Open(context.Background(), true))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
}
|
||||
|
@ -77,7 +78,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
|||
func TestCloseTwice(t *testing.T, cons Constructor) {
|
||||
// Use-case: move to maintenance mode twice, first time failed.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.Close())
|
||||
require.NoError(t, s.Close()) // already closed, no-op
|
||||
|
@ -89,12 +90,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
// Use-case: metabase `Init` failed,
|
||||
// call `SetMode` on all not-yet-initialized components.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
|
||||
t.Run("after open in RO", func(t *testing.T) {
|
||||
require.NoError(t, s.Close())
|
||||
require.NoError(t, s.Open(true))
|
||||
require.NoError(t, s.Open(context.Background(), true))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
})
|
||||
|
||||
|
@ -103,7 +104,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
t.Run("after init", func(t *testing.T) {
|
||||
s := cons(t)
|
||||
// Use-case: notmal node operation.
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.Close())
|
||||
|
@ -113,7 +114,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
||||
// Use-case: normal node operation.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(from))
|
||||
require.NoError(t, s.SetMode(to))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
@ -21,7 +22,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
|||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
func (db *DB) Open(readOnly bool) error {
|
||||
func (db *DB) Open(_ context.Context, readOnly bool) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -49,7 +50,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB {
|
|||
}, opts...)...,
|
||||
)
|
||||
|
||||
require.NoError(t, bdb.Open(false))
|
||||
require.NoError(t, bdb.Open(context.Background(), false))
|
||||
require.NoError(t, bdb.Init())
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -27,9 +28,9 @@ func (db *DB) SetMode(m mode.Mode) error {
|
|||
case m.NoMetabase():
|
||||
db.boltDB = nil
|
||||
case m.ReadOnly():
|
||||
err = db.Open(true)
|
||||
err = db.Open(context.TODO(), true)
|
||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
context.TODO too context.TODO too
elebedeva
commented
fixed fixed
|
||||
default:
|
||||
err = db.Open(false)
|
||||
err = db.Open(context.TODO(), false)
|
||||
}
|
||||
if err == nil && !m.NoMetabase() && !m.ReadOnly() {
|
||||
err = db.Init()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -42,13 +43,13 @@ func TestVersion(t *testing.T) {
|
|||
}
|
||||
t.Run("simple", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reopen", func(t *testing.T) {
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
@ -56,29 +57,29 @@ func TestVersion(t *testing.T) {
|
|||
})
|
||||
t.Run("old data", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
t.Run("invalid version", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return updateVersion(tx, version+1)
|
||||
}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.Error(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reset", func(t *testing.T) {
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Reset())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
|
|
@ -26,7 +26,7 @@ func BenchmarkCreate(b *testing.B) {
|
|||
f := NewBoltForest(
|
||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||
WithMaxBatchSize(runtime.GOMAXPROCS(0)))
|
||||
require.NoError(b, f.Open(false))
|
||||
require.NoError(b, f.Open(context.Background(), false))
|
||||
require.NoError(b, f.Init())
|
||||
b.Cleanup(func() {
|
||||
require.NoError(b, f.Close())
|
||||
|
|
|
@ -99,7 +99,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
|||
|
||||
err := t.Close()
|
||||
if err == nil && !m.NoMetabase() {
|
||||
if err = t.Open(m.ReadOnly()); err == nil {
|
||||
if err = t.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||
err = t.Init()
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
|||
t.metrics.SetMode(m)
|
||||
return nil
|
||||
}
|
||||
func (t *boltForest) Open(readOnly bool) error {
|
||||
func (t *boltForest) Open(_ context.Context, readOnly bool) error {
|
||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
||||
|
|
|
@ -110,7 +110,7 @@ func (f *memoryForest) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Open(bool) error {
|
||||
func (f *memoryForest) Open(context.Context, bool) error {
|
||||
return nil
|
||||
}
|
||||
func (f *memoryForest) SetMode(mode.Mode) error {
|
||||
|
|
|
@ -24,7 +24,7 @@ var providers = []struct {
|
|||
}{
|
||||
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
||||
f := NewMemoryForest()
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Open(context.Background(), false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
|
@ -37,7 +37,7 @@ var providers = []struct {
|
|||
append([]Option{
|
||||
WithPath(filepath.Join(t.TempDir(), "test.db")),
|
||||
WithMaxBatchSize(1)}, opts...)...)
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Open(context.Background(), false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
|
|
|
@ -58,7 +58,7 @@ type ForestStorage interface {
|
|||
// DumpInfo returns information about the pilorama.
|
||||
DumpInfo() Info
|
||||
Init() error
|
||||
Open(bool) error
|
||||
Open(context.Context, bool) error
|
||||
Close() error
|
||||
SetMode(m mode.Mode) error
|
||||
SetParentID(id string)
|
||||
|
|
|
@ -41,8 +41,10 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
|||
}
|
||||
|
||||
// Open opens all Shard's components.
|
||||
func (s *Shard) Open() error {
|
||||
components := []interface{ Open(bool) error }{
|
||||
func (s *Shard) Open(ctx context.Context) error {
|
||||
components := []interface {
|
||||
Open(context.Context, bool) error
|
||||
}{
|
||||
s.blobStor, s.metaBase,
|
||||
}
|
||||
|
||||
|
@ -55,12 +57,12 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
|
||||
for i, component := range components {
|
||||
if err := component.Open(false); err != nil {
|
||||
if err := component.Open(ctx, false); err != nil {
|
||||
if component == s.metaBase {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What is this select for? We have an error, no need to check the context. For the What is this select for? We have an error, no need to check the context. For the `component == s.metaBase` check it is already checked in the loop.
elebedeva
commented
Removed unnecessary select Removed unnecessary select
|
||||
// We must first open all other components to avoid
|
||||
// opening non-existent DB in read-only mode.
|
||||
for j := i + 1; j < len(components); j++ {
|
||||
if err := components[j].Open(false); err != nil {
|
||||
if err := components[j].Open(ctx, false); err != nil {
|
||||
// Other components must be opened, fail.
|
||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(int64(os.O_RDWR))
|
||||
|
||||
sh := newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
|
@ -96,7 +96,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(int64(os.O_RDONLY))
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||
require.Error(t, sh.SetMode(mode.ReadWrite))
|
||||
|
@ -107,7 +107,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(math.MaxInt64)
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
|
@ -135,7 +135,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
obj := objecttest.Object()
|
||||
|
@ -158,7 +158,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||
WithRefillMetabase(true))
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
var getPrm GetPrm
|
||||
|
@ -197,7 +197,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
@ -365,7 +365,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
|
|
@ -76,7 +76,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
|||
|
||||
sh = New(opts...)
|
||||
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/mr-tron/base58"
|
||||
"go.uber.org/zap"
|
||||
|
@ -27,8 +29,8 @@ func (s *Shard) ID() *ID {
|
|||
}
|
||||
|
||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||
func (s *Shard) UpdateID() (err error) {
|
||||
if err = s.metaBase.Open(false); err != nil {
|
||||
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||
if err = s.metaBase.Open(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
}
|
||||
|
||||
sh = New(opts...)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -269,7 +269,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
|||
meta.WithEpochState(epochState{})),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -52,7 +52,7 @@ func TestShardReload(t *testing.T) {
|
|||
pilorama.WithPath(filepath.Join(p, "pilorama")))}
|
||||
|
||||
sh := New(opts...)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
objects := make([]objAddr, 5)
|
||||
|
|
|
@ -117,7 +117,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
|||
|
||||
sh = New(opts...)
|
||||
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
if !o.dontRelease {
|
||||
|
|
|
@ -82,7 +82,7 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
|
|||
}
|
||||
|
||||
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
||||
require.NoError(b, cache.Open(false), "opening")
|
||||
require.NoError(b, cache.Open(context.Background(), false), "opening")
|
||||
require.NoError(b, cache.Init(), "initializing")
|
||||
b.Cleanup(func() {
|
||||
require.NoError(b, cache.Close(), "closing")
|
||||
|
|
|
@ -38,7 +38,7 @@ type Cache interface {
|
|||
Flush(context.Context, bool) error
|
||||
|
||||
Init() error
|
||||
Open(readOnly bool) error
|
||||
Open(ctx context.Context, readOnly bool) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -83,7 +84,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
|||
}
|
||||
|
||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||
func (c *cache) Open(readOnly bool) error {
|
||||
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||
err := c.openStore(readOnly)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
|
@ -97,7 +98,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
|||
}
|
||||
|
||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||
func (c *cache) Open(readOnly bool) error {
|
||||
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||
err := c.openStore(readOnly)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
|
|
|
@ -110,7 +110,7 @@ func newCache[Option any](
|
|||
mb := meta.New(
|
||||
meta.WithPath(filepath.Join(dir, "meta")),
|
||||
meta.WithEpochState(dummyEpoch{}))
|
||||
require.NoError(t, mb.Open(false))
|
||||
require.NoError(t, mb.Open(context.Background(), false))
|
||||
require.NoError(t, mb.Init())
|
||||
|
||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||
|
@ -121,12 +121,12 @@ func newCache[Option any](
|
|||
fstree.WithDirNameLen(1)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
|
||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Open(context.Background(), false))
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
// First set mode for metabase and blobstor to prevent background flushes.
|
||||
|
|
Loading…
Add table
Reference in a new issue
cmd.Context()
is what we usually use for this.Fixed.