[#653] Add context parameter to Open functions
All checks were successful
DCO action / DCO (pull_request) Successful in 1m38s
Build / Build Components (1.20) (pull_request) Successful in 4m22s
Build / Build Components (1.21) (pull_request) Successful in 4m25s
Vulncheck / Vulncheck (pull_request) Successful in 4m56s
Tests and linters / Lint (pull_request) Successful in 6m1s
Tests and linters / Tests (1.20) (pull_request) Successful in 7m43s
Tests and linters / Staticcheck (pull_request) Successful in 8m1s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m14s
Tests and linters / Tests with -race (pull_request) Successful in 8m32s
All checks were successful
DCO action / DCO (pull_request) Successful in 1m38s
Build / Build Components (1.20) (pull_request) Successful in 4m22s
Build / Build Components (1.21) (pull_request) Successful in 4m25s
Vulncheck / Vulncheck (pull_request) Successful in 4m56s
Tests and linters / Lint (pull_request) Successful in 6m1s
Tests and linters / Tests (1.20) (pull_request) Successful in 7m43s
Tests and linters / Staticcheck (pull_request) Successful in 8m1s
Tests and linters / Tests (1.21) (pull_request) Successful in 8m14s
Tests and linters / Tests with -race (pull_request) Successful in 8m32s
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
This commit is contained in:
parent
a716db99db
commit
8a81af5a3b
43 changed files with 129 additions and 109 deletions
|
@ -43,7 +43,7 @@ func openMeta(cmd *cobra.Command) *meta.DB {
|
|||
}),
|
||||
meta.WithEpochState(epochState{}),
|
||||
)
|
||||
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(true)))
|
||||
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(cmd.Context(), true)))
|
||||
|
||||
return db
|
||||
}
|
||||
|
|
|
@ -901,7 +901,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
|||
return c.localAddr
|
||||
}
|
||||
|
||||
func initLocalStorage(c *cfg) {
|
||||
func initLocalStorage(ctx context.Context, c *cfg) {
|
||||
ls := engine.New(c.engineOpts()...)
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||
|
@ -914,7 +914,7 @@ func initLocalStorage(c *cfg) {
|
|||
|
||||
var shardsAttached int
|
||||
for _, optsWithMeta := range c.shardOpts() {
|
||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||
} else {
|
||||
|
@ -931,7 +931,7 @@ func initLocalStorage(c *cfg) {
|
|||
c.onShutdown(func() {
|
||||
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
||||
|
||||
err := ls.Close()
|
||||
err := ls.Close(context.Background())
|
||||
if err != nil {
|
||||
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -91,10 +91,10 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
|
||||
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
|
||||
|
||||
initLocalStorage(c)
|
||||
initLocalStorage(ctx, c)
|
||||
|
||||
initAndLog(c, "storage engine", func(c *cfg) {
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx))
|
||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
||||
})
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ func TestCompression(t *testing.T) {
|
|||
bs := New(
|
||||
WithCompressObjects(compress),
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
return bs
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
Storage: fstree.New(fstree.WithPath(dir)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
return bs
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func TestConcurrentPut(t *testing.T) {
|
|||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
|
@ -268,7 +268,7 @@ func TestConcurrentDelete(t *testing.T) {
|
|||
|
||||
blobStor := New(
|
||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
||||
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -9,10 +10,15 @@ import (
|
|||
)
|
||||
|
||||
// Open opens BlobStor.
|
||||
func (b *BlobStor) Open(readOnly bool) error {
|
||||
func (b *BlobStor) Open(ctx context.Context, readOnly bool) error {
|
||||
b.log.Debug(logs.BlobstorOpening)
|
||||
|
||||
for i := range b.storage {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
err := b.storage[i].Storage.Open(readOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestExists(t *testing.T) {
|
|||
|
||||
b := New(WithStorages(storages))
|
||||
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Open(context.Background(), false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
objects := []*objectSDK.Object{
|
||||
|
|
|
@ -26,7 +26,7 @@ func TestIterateObjects(t *testing.T) {
|
|||
defer os.RemoveAll(p)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, blobStor.Open(false))
|
||||
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, blobStor.Init())
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -21,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
|
|||
|
||||
err := b.Close()
|
||||
if err == nil {
|
||||
if err = b.Open(m.ReadOnly()); err == nil {
|
||||
if err = b.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||
err = b.Init()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,11 +21,11 @@ type shardInitError struct {
|
|||
}
|
||||
|
||||
// Open opens all StorageEngine's components.
|
||||
func (e *StorageEngine) Open() error {
|
||||
return e.open()
|
||||
func (e *StorageEngine) Open(ctx context.Context) error {
|
||||
return e.open(ctx)
|
||||
}
|
||||
|
||||
func (e *StorageEngine) open() error {
|
||||
func (e *StorageEngine) open(ctx context.Context) error {
|
||||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
|
@ -36,7 +36,7 @@ func (e *StorageEngine) open() error {
|
|||
wg.Add(1)
|
||||
go func(id string, sh *shard.Shard) {
|
||||
defer wg.Done()
|
||||
if err := sh.Open(); err != nil {
|
||||
if err := sh.Open(ctx); err != nil {
|
||||
errCh <- shardInitError{
|
||||
err: err,
|
||||
id: id,
|
||||
|
@ -148,10 +148,10 @@ var errClosed = errors.New("storage engine is closed")
|
|||
// After the call, all the next ones will fail.
|
||||
//
|
||||
// The method MUST only be called when the application exits.
|
||||
func (e *StorageEngine) Close() error {
|
||||
func (e *StorageEngine) Close(ctx context.Context) error {
|
||||
close(e.closeCh)
|
||||
defer e.wg.Wait()
|
||||
return e.setBlockExecErr(errClosed)
|
||||
return e.setBlockExecErr(ctx, errClosed)
|
||||
}
|
||||
|
||||
// closes all shards. Never returns an error, shard errors are logged.
|
||||
|
@ -197,7 +197,7 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error {
|
|||
// - otherwise, resumes execution. If exec was blocked, calls open method.
|
||||
//
|
||||
// Can be called concurrently with exec. In this case it waits for all executions to complete.
|
||||
func (e *StorageEngine) setBlockExecErr(err error) error {
|
||||
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
||||
e.blockExec.mtx.Lock()
|
||||
defer e.blockExec.mtx.Unlock()
|
||||
|
||||
|
@ -212,7 +212,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
|||
|
||||
if err == nil {
|
||||
if prevErr != nil { // block -> ok
|
||||
return e.open()
|
||||
return e.open(ctx)
|
||||
}
|
||||
} else if prevErr == nil { // ok -> block
|
||||
return e.close(errors.Is(err, errClosed))
|
||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
|||
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
|
||||
// for this.
|
||||
func (e *StorageEngine) BlockExecution(err error) error {
|
||||
return e.setBlockExecErr(err)
|
||||
return e.setBlockExecErr(context.Background(), err)
|
||||
}
|
||||
|
||||
// ResumeExecution resumes the execution of any data-related operation.
|
||||
|
@ -247,7 +247,7 @@ func (e *StorageEngine) BlockExecution(err error) error {
|
|||
//
|
||||
// Must not be called concurrently with either Open or Init.
|
||||
func (e *StorageEngine) ResumeExecution() error {
|
||||
return e.setBlockExecErr(nil)
|
||||
return e.setBlockExecErr(context.Background(), nil)
|
||||
}
|
||||
|
||||
type ReConfiguration struct {
|
||||
|
@ -334,14 +334,14 @@ loop:
|
|||
}
|
||||
|
||||
for _, newID := range shardsToAdd {
|
||||
sh, err := e.createShard(rcfg.shards[newID])
|
||||
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
||||
}
|
||||
|
||||
idStr := sh.ID().String()
|
||||
|
||||
err = sh.Open()
|
||||
err = sh.Open(ctx)
|
||||
if err == nil {
|
||||
err = sh.Init(ctx)
|
||||
}
|
||||
|
|
|
@ -126,7 +126,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
var configID string
|
||||
|
||||
e := New()
|
||||
_, err := e.AddShard(opts...)
|
||||
_, err := e.AddShard(context.Background(), opts...)
|
||||
if errOnAdd {
|
||||
require.Error(t, err)
|
||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||
|
@ -144,7 +144,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||
e.mtx.RUnlock()
|
||||
|
||||
err = e.Open()
|
||||
err = e.Open(context.Background())
|
||||
if err == nil {
|
||||
require.Error(t, e.Init(context.Background()))
|
||||
}
|
||||
|
@ -193,7 +193,7 @@ func TestExecBlocks(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// close
|
||||
require.NoError(t, e.Close())
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
|
||||
// try exec after close
|
||||
_, err = Head(context.Background(), e, addr)
|
||||
|
@ -209,13 +209,13 @@ func TestPersistentShardID(t *testing.T) {
|
|||
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
|
||||
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
||||
for i := 0; i < len(newTe.shards); i++ {
|
||||
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
||||
}
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||
|
||||
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||
|
@ -227,7 +227,7 @@ func TestPersistentShardID(t *testing.T) {
|
|||
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
||||
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
||||
require.NoError(t, newTe.ng.Close())
|
||||
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||
|
||||
}
|
||||
|
||||
|
@ -313,7 +313,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
|||
require.Equal(t, num, len(e.shards))
|
||||
require.Equal(t, num, len(e.shardPools))
|
||||
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
return e, currShards
|
||||
|
|
|
@ -54,7 +54,7 @@ func TestDeleteBigObject(t *testing.T) {
|
|||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||
e.log = test.NewLogger(t, true)
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
for i := range children {
|
||||
require.NoError(t, Put(context.Background(), e, children[i]))
|
||||
|
|
|
@ -50,7 +50,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
|
||||
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
||||
b.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
_ = os.RemoveAll(b.Name())
|
||||
})
|
||||
|
||||
|
@ -119,7 +119,7 @@ func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrap
|
|||
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||
for i := 0; i < num; i++ {
|
||||
opts := shardOpts(i)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, s
|
|||
for i := 0; i < num; i++ {
|
||||
defaultOpts := testDefaultShardOptions(t, i)
|
||||
opts := append(defaultOpts, shardOpts(i)...)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
|
|||
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
||||
s := shard.New(shardOpts...)
|
||||
|
||||
require.NoError(t, s.Open())
|
||||
require.NoError(t, s.Open(context.Background()))
|
||||
require.NoError(t, s.Init(context.Background()))
|
||||
|
||||
return s
|
||||
|
|
|
@ -68,7 +68,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
}
|
||||
})
|
||||
e := te.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
for i, id := range te.shardIDs {
|
||||
|
@ -192,7 +192,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
}
|
||||
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
require.NoError(t, te.ng.Close(context.Background()))
|
||||
|
||||
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
|
|
|
@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
|||
}
|
||||
})
|
||||
e, ids := te.engine, te.shardIDs
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||
|
|
|
@ -43,7 +43,7 @@ func TestHeadRaw(t *testing.T) {
|
|||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
var putPrmLeft shard.PutPrm
|
||||
putPrmLeft.SetObject(child)
|
||||
|
|
|
@ -37,7 +37,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
|
||||
t.Run("delete small object", func(t *testing.T) {
|
||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
err := Put(context.Background(), e, parent)
|
||||
require.NoError(t, err)
|
||||
|
@ -58,7 +58,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
defer e.Close()
|
||||
defer e.Close(context.Background())
|
||||
|
||||
var putChild shard.PutPrm
|
||||
putChild.SetObject(child)
|
||||
|
|
|
@ -76,11 +76,11 @@ func TestListWithCursor(t *testing.T) {
|
|||
meta.WithEpochState(epochState{}),
|
||||
)}
|
||||
}).engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
e.Close(context.Background())
|
||||
})
|
||||
|
||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||
|
|
|
@ -59,11 +59,11 @@ func TestLockUserScenario(t *testing.T) {
|
|||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
lockerID := oidtest.ID()
|
||||
|
@ -167,11 +167,11 @@ func TestLockExpiration(t *testing.T) {
|
|||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
const lockerExpiresAfter = 13
|
||||
|
@ -247,10 +247,10 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
}
|
||||
}).engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Open(context.Background()))
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = e.Close(context.Background())
|
||||
})
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -77,8 +78,8 @@ func (m *metricsWithID) DeleteShardMetrics() {
|
|||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
// Otherwise returns the ID of the added shard.
|
||||
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
||||
sh, err := e.createShard(opts)
|
||||
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||
sh, err := e.createShard(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
||||
}
|
||||
|
@ -95,7 +96,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
return sh.ID(), nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
||||
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
|
@ -111,7 +112,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
|||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -12,7 +13,7 @@ func TestRemoveShard(t *testing.T) {
|
|||
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||
e, ids := te.engine, te.shardIDs
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
e.Close(context.Background())
|
||||
})
|
||||
|
||||
require.Equal(t, numOfShards, len(e.shardPools))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storagetest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -9,7 +10,7 @@ import (
|
|||
|
||||
// Component represents single storage component.
|
||||
type Component interface {
|
||||
Open(bool) error
|
||||
Open(context.Context, bool) error
|
||||
SetMode(mode.Mode) error
|
||||
Init() error
|
||||
Close() error
|
||||
|
@ -57,18 +58,18 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
|||
t.Run("RW", func(t *testing.T) {
|
||||
// Use-case: irrecoverable error on some components, close everything.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
t.Run("RO", func(t *testing.T) {
|
||||
// Use-case: irrecoverable error on some components, close everything.
|
||||
// Open in read-only must be done after the db is here.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
require.NoError(t, s.Open(true))
|
||||
require.NoError(t, s.Open(context.Background(), true))
|
||||
require.NoError(t, s.Close())
|
||||
})
|
||||
}
|
||||
|
@ -77,7 +78,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
|||
func TestCloseTwice(t *testing.T, cons Constructor) {
|
||||
// Use-case: move to maintenance mode twice, first time failed.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.Close())
|
||||
require.NoError(t, s.Close()) // already closed, no-op
|
||||
|
@ -89,12 +90,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
// Use-case: metabase `Init` failed,
|
||||
// call `SetMode` on all not-yet-initialized components.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
|
||||
t.Run("after open in RO", func(t *testing.T) {
|
||||
require.NoError(t, s.Close())
|
||||
require.NoError(t, s.Open(true))
|
||||
require.NoError(t, s.Open(context.Background(), true))
|
||||
require.NoError(t, s.SetMode(m))
|
||||
})
|
||||
|
||||
|
@ -103,7 +104,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
t.Run("after init", func(t *testing.T) {
|
||||
s := cons(t)
|
||||
// Use-case: notmal node operation.
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(m))
|
||||
require.NoError(t, s.Close())
|
||||
|
@ -113,7 +114,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
|||
func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
||||
// Use-case: normal node operation.
|
||||
s := cons(t)
|
||||
require.NoError(t, s.Open(false))
|
||||
require.NoError(t, s.Open(context.Background(), false))
|
||||
require.NoError(t, s.Init())
|
||||
require.NoError(t, s.SetMode(from))
|
||||
require.NoError(t, s.SetMode(to))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
@ -21,7 +22,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
|||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
func (db *DB) Open(readOnly bool) error {
|
||||
func (db *DB) Open(_ context.Context, readOnly bool) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -49,7 +50,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB {
|
|||
}, opts...)...,
|
||||
)
|
||||
|
||||
require.NoError(t, bdb.Open(false))
|
||||
require.NoError(t, bdb.Open(context.Background(), false))
|
||||
require.NoError(t, bdb.Init())
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -27,9 +28,9 @@ func (db *DB) SetMode(m mode.Mode) error {
|
|||
case m.NoMetabase():
|
||||
db.boltDB = nil
|
||||
case m.ReadOnly():
|
||||
err = db.Open(true)
|
||||
err = db.Open(context.TODO(), true)
|
||||
default:
|
||||
err = db.Open(false)
|
||||
err = db.Open(context.TODO(), false)
|
||||
}
|
||||
if err == nil && !m.NoMetabase() && !m.ReadOnly() {
|
||||
err = db.Init()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -42,13 +43,13 @@ func TestVersion(t *testing.T) {
|
|||
}
|
||||
t.Run("simple", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reopen", func(t *testing.T) {
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
@ -56,29 +57,29 @@ func TestVersion(t *testing.T) {
|
|||
})
|
||||
t.Run("old data", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Init())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
})
|
||||
t.Run("invalid version", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return updateVersion(tx, version+1)
|
||||
}))
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.Error(t, db.Init())
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
t.Run("reset", func(t *testing.T) {
|
||||
require.NoError(t, db.Open(false))
|
||||
require.NoError(t, db.Open(context.Background(), false))
|
||||
require.NoError(t, db.Reset())
|
||||
check(t, db)
|
||||
require.NoError(t, db.Close())
|
||||
|
|
|
@ -26,7 +26,7 @@ func BenchmarkCreate(b *testing.B) {
|
|||
f := NewBoltForest(
|
||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||
WithMaxBatchSize(runtime.GOMAXPROCS(0)))
|
||||
require.NoError(b, f.Open(false))
|
||||
require.NoError(b, f.Open(context.Background(), false))
|
||||
require.NoError(b, f.Init())
|
||||
b.Cleanup(func() {
|
||||
require.NoError(b, f.Close())
|
||||
|
|
|
@ -99,7 +99,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
|||
|
||||
err := t.Close()
|
||||
if err == nil && !m.NoMetabase() {
|
||||
if err = t.Open(m.ReadOnly()); err == nil {
|
||||
if err = t.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||
err = t.Init()
|
||||
}
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
|||
t.metrics.SetMode(m)
|
||||
return nil
|
||||
}
|
||||
func (t *boltForest) Open(readOnly bool) error {
|
||||
func (t *boltForest) Open(_ context.Context, readOnly bool) error {
|
||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
||||
|
|
|
@ -110,7 +110,7 @@ func (f *memoryForest) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Open(bool) error {
|
||||
func (f *memoryForest) Open(context.Context, bool) error {
|
||||
return nil
|
||||
}
|
||||
func (f *memoryForest) SetMode(mode.Mode) error {
|
||||
|
|
|
@ -24,7 +24,7 @@ var providers = []struct {
|
|||
}{
|
||||
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
||||
f := NewMemoryForest()
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Open(context.Background(), false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
|
@ -37,7 +37,7 @@ var providers = []struct {
|
|||
append([]Option{
|
||||
WithPath(filepath.Join(t.TempDir(), "test.db")),
|
||||
WithMaxBatchSize(1)}, opts...)...)
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Open(context.Background(), false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, f.Close())
|
||||
|
|
|
@ -58,7 +58,7 @@ type ForestStorage interface {
|
|||
// DumpInfo returns information about the pilorama.
|
||||
DumpInfo() Info
|
||||
Init() error
|
||||
Open(bool) error
|
||||
Open(context.Context, bool) error
|
||||
Close() error
|
||||
SetMode(m mode.Mode) error
|
||||
SetParentID(id string)
|
||||
|
|
|
@ -41,8 +41,10 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
|||
}
|
||||
|
||||
// Open opens all Shard's components.
|
||||
func (s *Shard) Open() error {
|
||||
components := []interface{ Open(bool) error }{
|
||||
func (s *Shard) Open(ctx context.Context) error {
|
||||
components := []interface {
|
||||
Open(context.Context, bool) error
|
||||
}{
|
||||
s.blobStor, s.metaBase,
|
||||
}
|
||||
|
||||
|
@ -55,12 +57,12 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
|
||||
for i, component := range components {
|
||||
if err := component.Open(false); err != nil {
|
||||
if err := component.Open(ctx, false); err != nil {
|
||||
if component == s.metaBase {
|
||||
// We must first open all other components to avoid
|
||||
// opening non-existent DB in read-only mode.
|
||||
for j := i + 1; j < len(components); j++ {
|
||||
if err := components[j].Open(false); err != nil {
|
||||
if err := components[j].Open(ctx, false); err != nil {
|
||||
// Other components must be opened, fail.
|
||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(int64(os.O_RDWR))
|
||||
|
||||
sh := newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
|
@ -96,7 +96,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(int64(os.O_RDONLY))
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||
require.Error(t, sh.SetMode(mode.ReadWrite))
|
||||
|
@ -107,7 +107,7 @@ func TestShardOpen(t *testing.T) {
|
|||
allowedMode.Store(math.MaxInt64)
|
||||
|
||||
sh = newShard()
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
||||
require.NoError(t, sh.Close())
|
||||
|
@ -135,7 +135,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
obj := objecttest.Object()
|
||||
|
@ -158,7 +158,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||
WithRefillMetabase(true))
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
var getPrm GetPrm
|
||||
|
@ -197,7 +197,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
@ -365,7 +365,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
)
|
||||
|
||||
// open Blobstor
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
|
||||
// initialize Blobstor
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
|
|
@ -76,7 +76,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
|||
|
||||
sh = New(opts...)
|
||||
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/mr-tron/base58"
|
||||
"go.uber.org/zap"
|
||||
|
@ -27,8 +29,8 @@ func (s *Shard) ID() *ID {
|
|||
}
|
||||
|
||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||
func (s *Shard) UpdateID() (err error) {
|
||||
if err = s.metaBase.Open(false); err != nil {
|
||||
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||
if err = s.metaBase.Open(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
}
|
||||
|
||||
sh = New(opts...)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -269,7 +269,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
|||
meta.WithEpochState(epochState{})),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
|
|
|
@ -52,7 +52,7 @@ func TestShardReload(t *testing.T) {
|
|||
pilorama.WithPath(filepath.Join(p, "pilorama")))}
|
||||
|
||||
sh := New(opts...)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
objects := make([]objAddr, 5)
|
||||
|
|
|
@ -117,7 +117,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
|||
|
||||
sh = New(opts...)
|
||||
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
if !o.dontRelease {
|
||||
|
|
|
@ -82,7 +82,7 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
|
|||
}
|
||||
|
||||
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
||||
require.NoError(b, cache.Open(false), "opening")
|
||||
require.NoError(b, cache.Open(context.Background(), false), "opening")
|
||||
require.NoError(b, cache.Init(), "initializing")
|
||||
b.Cleanup(func() {
|
||||
require.NoError(b, cache.Close(), "closing")
|
||||
|
|
|
@ -38,7 +38,7 @@ type Cache interface {
|
|||
Flush(context.Context, bool) error
|
||||
|
||||
Init() error
|
||||
Open(readOnly bool) error
|
||||
Open(ctx context.Context, readOnly bool) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebadger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -83,7 +84,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
|||
}
|
||||
|
||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||
func (c *cache) Open(readOnly bool) error {
|
||||
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||
err := c.openStore(readOnly)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package writecachebbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
|
@ -97,7 +98,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
|||
}
|
||||
|
||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||
func (c *cache) Open(readOnly bool) error {
|
||||
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||
err := c.openStore(readOnly)
|
||||
if err != nil {
|
||||
return metaerr.Wrap(err)
|
||||
|
|
|
@ -110,7 +110,7 @@ func newCache[Option any](
|
|||
mb := meta.New(
|
||||
meta.WithPath(filepath.Join(dir, "meta")),
|
||||
meta.WithEpochState(dummyEpoch{}))
|
||||
require.NoError(t, mb.Open(false))
|
||||
require.NoError(t, mb.Open(context.Background(), false))
|
||||
require.NoError(t, mb.Init())
|
||||
|
||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||
|
@ -121,12 +121,12 @@ func newCache[Option any](
|
|||
fstree.WithDirNameLen(1)),
|
||||
},
|
||||
}))
|
||||
require.NoError(t, bs.Open(false))
|
||||
require.NoError(t, bs.Open(context.Background(), false))
|
||||
require.NoError(t, bs.Init())
|
||||
|
||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||
require.NoError(t, wc.Open(false))
|
||||
require.NoError(t, wc.Open(context.Background(), false))
|
||||
require.NoError(t, wc.Init())
|
||||
|
||||
// First set mode for metabase and blobstor to prevent background flushes.
|
||||
|
|
Loading…
Reference in a new issue