[#653] Pass context.Context to StorageEngine Open #672
43 changed files with 129 additions and 109 deletions
|
@ -43,7 +43,7 @@ func openMeta(cmd *cobra.Command) *meta.DB {
|
||||||
}),
|
}),
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
)
|
)
|
||||||
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(true)))
|
common.ExitOnErr(cmd, common.Errf("could not open metabase: %w", db.Open(cmd.Context(), true)))
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
|
@ -901,7 +901,7 @@ func (c *cfg) LocalAddress() network.AddressGroup {
|
||||||
return c.localAddr
|
return c.localAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func initLocalStorage(c *cfg) {
|
func initLocalStorage(ctx context.Context, c *cfg) {
|
||||||
ls := engine.New(c.engineOpts()...)
|
ls := engine.New(c.engineOpts()...)
|
||||||
|
|
||||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||||
|
@ -914,7 +914,7 @@ func initLocalStorage(c *cfg) {
|
||||||
|
|
||||||
var shardsAttached int
|
var shardsAttached int
|
||||||
for _, optsWithMeta := range c.shardOpts() {
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -931,7 +931,7 @@ func initLocalStorage(c *cfg) {
|
||||||
c.onShutdown(func() {
|
c.onShutdown(func() {
|
||||||
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
||||||
|
|
||||||
err := ls.Close()
|
err := ls.Close(context.Background())
|
||||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
Need to check: if Need to check: if `onShutdown` calls afetr ctx is cancelled, then `Close()` may fail. So `context.Background()` or detached https://pkg.go.dev/context#WithoutCancel should be used.
fyrchik
commented
`WithoutCancel()` was added in 1.21, but we need to support 1.20 too.
elebedeva
commented
changed changed `ls.Close(ctx)` to `ls.Close(context.Background())`
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
|
|
@ -91,10 +91,10 @@ func initApp(ctx context.Context, c *cfg) {
|
||||||
|
|
||||||
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
|
initAndLog(c, "tracing", func(c *cfg) { initTracing(ctx, c) })
|
||||||
|
|
||||||
initLocalStorage(c)
|
initLocalStorage(ctx, c)
|
||||||
|
|
||||||
initAndLog(c, "storage engine", func(c *cfg) {
|
initAndLog(c, "storage engine", func(c *cfg) {
|
||||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open())
|
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open(ctx))
|
||||||
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init(ctx))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ func TestCompression(t *testing.T) {
|
||||||
bs := New(
|
bs := New(
|
||||||
WithCompressObjects(compress),
|
WithCompressObjects(compress),
|
||||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||||
require.NoError(t, bs.Open(false))
|
require.NoError(t, bs.Open(context.Background(), false))
|
||||||
require.NoError(t, bs.Init())
|
require.NoError(t, bs.Init())
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
@ -126,7 +126,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
||||||
Storage: fstree.New(fstree.WithPath(dir)),
|
Storage: fstree.New(fstree.WithPath(dir)),
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
require.NoError(t, bs.Open(false))
|
require.NoError(t, bs.Open(context.Background(), false))
|
||||||
require.NoError(t, bs.Init())
|
require.NoError(t, bs.Init())
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ func TestConcurrentPut(t *testing.T) {
|
||||||
|
|
||||||
blobStor := New(
|
blobStor := New(
|
||||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||||
require.NoError(t, blobStor.Open(false))
|
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||||
require.NoError(t, blobStor.Init())
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
||||||
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
testGet := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
@ -268,7 +268,7 @@ func TestConcurrentDelete(t *testing.T) {
|
||||||
|
|
||||||
blobStor := New(
|
blobStor := New(
|
||||||
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
WithStorages(defaultStorages(dir, smallSizeLimit)))
|
||||||
require.NoError(t, blobStor.Open(false))
|
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||||
require.NoError(t, blobStor.Init())
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
||||||
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
testPut := func(t *testing.T, b *BlobStor, obj *objectSDK.Object) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -9,10 +10,15 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Open opens BlobStor.
|
// Open opens BlobStor.
|
||||||
func (b *BlobStor) Open(readOnly bool) error {
|
func (b *BlobStor) Open(ctx context.Context, readOnly bool) error {
|
||||||
b.log.Debug(logs.BlobstorOpening)
|
b.log.Debug(logs.BlobstorOpening)
|
||||||
|
|
||||||
for i := range b.storage {
|
for i := range b.storage {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
err := b.storage[i].Storage.Open(readOnly)
|
err := b.storage[i].Storage.Open(readOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestExists(t *testing.T) {
|
||||||
|
|
||||||
b := New(WithStorages(storages))
|
b := New(WithStorages(storages))
|
||||||
|
|
||||||
require.NoError(t, b.Open(false))
|
require.NoError(t, b.Open(context.Background(), false))
|
||||||
require.NoError(t, b.Init())
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
objects := []*objectSDK.Object{
|
objects := []*objectSDK.Object{
|
||||||
|
|
|
@ -26,7 +26,7 @@ func TestIterateObjects(t *testing.T) {
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
// open Blobstor
|
// open Blobstor
|
||||||
require.NoError(t, blobStor.Open(false))
|
require.NoError(t, blobStor.Open(context.Background(), false))
|
||||||
|
|
||||||
// initialize Blobstor
|
// initialize Blobstor
|
||||||
require.NoError(t, blobStor.Init())
|
require.NoError(t, blobStor.Init())
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -21,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
|
||||||
|
|
||||||
err := b.Close()
|
err := b.Close()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if err = b.Open(m.ReadOnly()); err == nil {
|
if err = b.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
context.TODO is more preferred in such cases (or we should pass ctx to SetMode too :) ) context.TODO is more preferred in such cases (or we should pass ctx to SetMode too :) )
elebedeva
commented
fixed fixed
|
|||||||
err = b.Init()
|
err = b.Init()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,11 @@ type shardInitError struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens all StorageEngine's components.
|
// Open opens all StorageEngine's components.
|
||||||
func (e *StorageEngine) Open() error {
|
func (e *StorageEngine) Open(ctx context.Context) error {
|
||||||
return e.open()
|
return e.open(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) open() error {
|
func (e *StorageEngine) open(ctx context.Context) error {
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.Unlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ func (e *StorageEngine) open() error {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(id string, sh *shard.Shard) {
|
go func(id string, sh *shard.Shard) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := sh.Open(); err != nil {
|
if err := sh.Open(ctx); err != nil {
|
||||||
errCh <- shardInitError{
|
errCh <- shardInitError{
|
||||||
err: err,
|
err: err,
|
||||||
id: id,
|
id: id,
|
||||||
|
@ -148,10 +148,10 @@ var errClosed = errors.New("storage engine is closed")
|
||||||
// After the call, all the next ones will fail.
|
// After the call, all the next ones will fail.
|
||||||
//
|
//
|
||||||
// The method MUST only be called when the application exits.
|
// The method MUST only be called when the application exits.
|
||||||
func (e *StorageEngine) Close() error {
|
func (e *StorageEngine) Close(ctx context.Context) error {
|
||||||
close(e.closeCh)
|
close(e.closeCh)
|
||||||
defer e.wg.Wait()
|
defer e.wg.Wait()
|
||||||
return e.setBlockExecErr(errClosed)
|
return e.setBlockExecErr(ctx, errClosed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// closes all shards. Never returns an error, shard errors are logged.
|
// closes all shards. Never returns an error, shard errors are logged.
|
||||||
|
@ -197,7 +197,7 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error {
|
||||||
// - otherwise, resumes execution. If exec was blocked, calls open method.
|
// - otherwise, resumes execution. If exec was blocked, calls open method.
|
||||||
//
|
//
|
||||||
// Can be called concurrently with exec. In this case it waits for all executions to complete.
|
// Can be called concurrently with exec. In this case it waits for all executions to complete.
|
||||||
func (e *StorageEngine) setBlockExecErr(err error) error {
|
func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
||||||
e.blockExec.mtx.Lock()
|
e.blockExec.mtx.Lock()
|
||||||
defer e.blockExec.mtx.Unlock()
|
defer e.blockExec.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -212,7 +212,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if prevErr != nil { // block -> ok
|
if prevErr != nil { // block -> ok
|
||||||
return e.open()
|
return e.open(ctx)
|
||||||
}
|
}
|
||||||
} else if prevErr == nil { // ok -> block
|
} else if prevErr == nil { // ok -> block
|
||||||
return e.close(errors.Is(err, errClosed))
|
return e.close(errors.Is(err, errClosed))
|
||||||
|
@ -235,7 +235,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error {
|
||||||
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
|
// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution
|
||||||
// for this.
|
// for this.
|
||||||
func (e *StorageEngine) BlockExecution(err error) error {
|
func (e *StorageEngine) BlockExecution(err error) error {
|
||||||
return e.setBlockExecErr(err)
|
return e.setBlockExecErr(context.Background(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumeExecution resumes the execution of any data-related operation.
|
// ResumeExecution resumes the execution of any data-related operation.
|
||||||
|
@ -247,7 +247,7 @@ func (e *StorageEngine) BlockExecution(err error) error {
|
||||||
//
|
//
|
||||||
// Must not be called concurrently with either Open or Init.
|
// Must not be called concurrently with either Open or Init.
|
||||||
func (e *StorageEngine) ResumeExecution() error {
|
func (e *StorageEngine) ResumeExecution() error {
|
||||||
return e.setBlockExecErr(nil)
|
return e.setBlockExecErr(context.Background(), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReConfiguration struct {
|
type ReConfiguration struct {
|
||||||
|
@ -334,14 +334,14 @@ loop:
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, newID := range shardsToAdd {
|
for _, newID := range shardsToAdd {
|
||||||
sh, err := e.createShard(rcfg.shards[newID])
|
sh, err := e.createShard(ctx, rcfg.shards[newID])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
idStr := sh.ID().String()
|
idStr := sh.ID().String()
|
||||||
|
|
||||||
err = sh.Open()
|
err = sh.Open(ctx)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = sh.Init(ctx)
|
err = sh.Init(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
||||||
var configID string
|
var configID string
|
||||||
|
|
||||||
e := New()
|
e := New()
|
||||||
_, err := e.AddShard(opts...)
|
_, err := e.AddShard(context.Background(), opts...)
|
||||||
if errOnAdd {
|
if errOnAdd {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
// This branch is only taken when we cannot update shard ID in the metabase.
|
// This branch is only taken when we cannot update shard ID in the metabase.
|
||||||
|
@ -144,7 +144,7 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
||||||
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
err = e.Open()
|
err = e.Open(context.Background())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
require.Error(t, e.Init(context.Background()))
|
require.Error(t, e.Init(context.Background()))
|
||||||
}
|
}
|
||||||
|
@ -193,7 +193,7 @@ func TestExecBlocks(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// close
|
// close
|
||||||
require.NoError(t, e.Close())
|
require.NoError(t, e.Close(context.Background()))
|
||||||
|
|
||||||
// try exec after close
|
// try exec after close
|
||||||
_, err = Head(context.Background(), e, addr)
|
_, err = Head(context.Background(), e, addr)
|
||||||
|
@ -209,13 +209,13 @@ func TestPersistentShardID(t *testing.T) {
|
||||||
te := newEngineWithErrorThreshold(t, dir, 1)
|
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
require.NoError(t, te.ng.Close())
|
require.NoError(t, te.ng.Close(context.Background()))
|
||||||
|
|
||||||
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
newTe := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
for i := 0; i < len(newTe.shards); i++ {
|
for i := 0; i < len(newTe.shards); i++ {
|
||||||
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
|
||||||
}
|
}
|
||||||
require.NoError(t, newTe.ng.Close())
|
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||||
|
|
||||||
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||||
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
|
||||||
|
@ -227,7 +227,7 @@ func TestPersistentShardID(t *testing.T) {
|
||||||
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
newTe = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
|
||||||
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
|
||||||
require.NoError(t, newTe.ng.Close())
|
require.NoError(t, newTe.ng.Close(context.Background()))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +313,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
||||||
require.Equal(t, num, len(e.shards))
|
require.Equal(t, num, len(e.shards))
|
||||||
require.Equal(t, num, len(e.shardPools))
|
require.Equal(t, num, len(e.shardPools))
|
||||||
|
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
return e, currShards
|
return e, currShards
|
||||||
|
|
|
@ -54,7 +54,7 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
|
|
||||||
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||||
e.log = test.NewLogger(t, true)
|
e.log = test.NewLogger(t, true)
|
||||||
defer e.Close()
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
for i := range children {
|
for i := range children {
|
||||||
require.NoError(t, Put(context.Background(), e, children[i]))
|
require.NoError(t, Put(context.Background(), e, children[i]))
|
||||||
|
|
|
@ -50,7 +50,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
|
|
||||||
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
||||||
b.Cleanup(func() {
|
b.Cleanup(func() {
|
||||||
_ = e.Close()
|
_ = e.Close(context.Background())
|
||||||
_ = os.RemoveAll(b.Name())
|
_ = os.RemoveAll(b.Name())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -119,7 +119,7 @@ func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrap
|
||||||
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||||
for i := 0; i < num; i++ {
|
for i := 0; i < num; i++ {
|
||||||
opts := shardOpts(i)
|
opts := shardOpts(i)
|
||||||
id, err := te.engine.AddShard(opts...)
|
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
te.shardIDs = append(te.shardIDs, id)
|
te.shardIDs = append(te.shardIDs, id)
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,7 @@ func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, s
|
||||||
for i := 0; i < num; i++ {
|
for i := 0; i < num; i++ {
|
||||||
defaultOpts := testDefaultShardOptions(t, i)
|
defaultOpts := testDefaultShardOptions(t, i)
|
||||||
opts := append(defaultOpts, shardOpts(i)...)
|
opts := append(defaultOpts, shardOpts(i)...)
|
||||||
id, err := te.engine.AddShard(opts...)
|
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
te.shardIDs = append(te.shardIDs, id)
|
te.shardIDs = append(te.shardIDs, id)
|
||||||
}
|
}
|
||||||
|
@ -190,7 +190,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
|
||||||
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
||||||
s := shard.New(shardOpts...)
|
s := shard.New(shardOpts...)
|
||||||
|
|
||||||
require.NoError(t, s.Open())
|
require.NoError(t, s.Open(context.Background()))
|
||||||
require.NoError(t, s.Init(context.Background()))
|
require.NoError(t, s.Init(context.Background()))
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
|
|
@ -68,7 +68,7 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
e := te.engine
|
e := te.engine
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
for i, id := range te.shardIDs {
|
for i, id := range te.shardIDs {
|
||||||
|
@ -192,7 +192,7 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
require.NoError(t, te.ng.Close())
|
require.NoError(t, te.ng.Close(context.Background()))
|
||||||
|
|
||||||
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||||
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||||
|
|
|
@ -44,7 +44,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||||
|
|
|
@ -43,7 +43,7 @@ func TestHeadRaw(t *testing.T) {
|
||||||
s2 := testNewShard(t, 2)
|
s2 := testNewShard(t, 2)
|
||||||
|
|
||||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||||
defer e.Close()
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
var putPrmLeft shard.PutPrm
|
var putPrmLeft shard.PutPrm
|
||||||
putPrmLeft.SetObject(child)
|
putPrmLeft.SetObject(child)
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
|
|
||||||
t.Run("delete small object", func(t *testing.T) {
|
t.Run("delete small object", func(t *testing.T) {
|
||||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||||
defer e.Close()
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
err := Put(context.Background(), e, parent)
|
err := Put(context.Background(), e, parent)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -58,7 +58,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
s2 := testNewShard(t, 2)
|
s2 := testNewShard(t, 2)
|
||||||
|
|
||||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||||
defer e.Close()
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
var putChild shard.PutPrm
|
var putChild shard.PutPrm
|
||||||
putChild.SetObject(child)
|
putChild.SetObject(child)
|
||||||
|
|
|
@ -76,11 +76,11 @@ func TestListWithCursor(t *testing.T) {
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
)}
|
)}
|
||||||
}).engine
|
}).engine
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
e.Close()
|
e.Close(context.Background())
|
||||||
})
|
})
|
||||||
|
|
||||||
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
expected := make([]object.AddressWithType, 0, tt.objectNum)
|
||||||
|
|
|
@ -59,11 +59,11 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
e := testEngine.engine
|
e := testEngine.engine
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
_ = e.Close()
|
_ = e.Close(context.Background())
|
||||||
})
|
})
|
||||||
|
|
||||||
lockerID := oidtest.ID()
|
lockerID := oidtest.ID()
|
||||||
|
@ -167,11 +167,11 @@ func TestLockExpiration(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
e := testEngine.engine
|
e := testEngine.engine
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
_ = e.Close()
|
_ = e.Close(context.Background())
|
||||||
})
|
})
|
||||||
|
|
||||||
const lockerExpiresAfter = 13
|
const lockerExpiresAfter = 13
|
||||||
|
@ -247,10 +247,10 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||||
}
|
}
|
||||||
}).engine
|
}).engine
|
||||||
require.NoError(t, e.Open())
|
require.NoError(t, e.Open(context.Background()))
|
||||||
require.NoError(t, e.Init(context.Background()))
|
require.NoError(t, e.Init(context.Background()))
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
_ = e.Close()
|
_ = e.Close(context.Background())
|
||||||
})
|
})
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
@ -77,8 +78,8 @@ func (m *metricsWithID) DeleteShardMetrics() {
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow adding a shard.
|
// Returns any error encountered that did not allow adding a shard.
|
||||||
// Otherwise returns the ID of the added shard.
|
// Otherwise returns the ID of the added shard.
|
||||||
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*shard.ID, error) {
|
||||||
sh, err := e.createShard(opts)
|
sh, err := e.createShard(ctx, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create a shard: %w", err)
|
return nil, fmt.Errorf("could not create a shard: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -95,7 +96,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
||||||
return sh.ID(), nil
|
return sh.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||||
id, err := generateShardID()
|
id, err := generateShardID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||||
|
@ -111,7 +112,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
||||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := sh.UpdateID(); err != nil {
|
if err := sh.UpdateID(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -12,7 +13,7 @@ func TestRemoveShard(t *testing.T) {
|
||||||
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
e.Close()
|
e.Close(context.Background())
|
||||||
})
|
})
|
||||||
|
|
||||||
require.Equal(t, numOfShards, len(e.shardPools))
|
require.Equal(t, numOfShards, len(e.shardPools))
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package storagetest
|
package storagetest
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -9,7 +10,7 @@ import (
|
||||||
|
|
||||||
// Component represents single storage component.
|
// Component represents single storage component.
|
||||||
type Component interface {
|
type Component interface {
|
||||||
Open(bool) error
|
Open(context.Context, bool) error
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
Init() error
|
Init() error
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -57,18 +58,18 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
||||||
t.Run("RW", func(t *testing.T) {
|
t.Run("RW", func(t *testing.T) {
|
||||||
// Use-case: irrecoverable error on some components, close everything.
|
// Use-case: irrecoverable error on some components, close everything.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
})
|
})
|
||||||
t.Run("RO", func(t *testing.T) {
|
t.Run("RO", func(t *testing.T) {
|
||||||
// Use-case: irrecoverable error on some components, close everything.
|
// Use-case: irrecoverable error on some components, close everything.
|
||||||
// Open in read-only must be done after the db is here.
|
// Open in read-only must be done after the db is here.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
|
|
||||||
require.NoError(t, s.Open(true))
|
require.NoError(t, s.Open(context.Background(), true))
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -77,7 +78,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) {
|
||||||
func TestCloseTwice(t *testing.T, cons Constructor) {
|
func TestCloseTwice(t *testing.T, cons Constructor) {
|
||||||
// Use-case: move to maintenance mode twice, first time failed.
|
// Use-case: move to maintenance mode twice, first time failed.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
require.NoError(t, s.Close()) // already closed, no-op
|
require.NoError(t, s.Close()) // already closed, no-op
|
||||||
|
@ -89,12 +90,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
// Use-case: metabase `Init` failed,
|
// Use-case: metabase `Init` failed,
|
||||||
// call `SetMode` on all not-yet-initialized components.
|
// call `SetMode` on all not-yet-initialized components.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(m))
|
||||||
|
|
||||||
t.Run("after open in RO", func(t *testing.T) {
|
t.Run("after open in RO", func(t *testing.T) {
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
require.NoError(t, s.Open(true))
|
require.NoError(t, s.Open(context.Background(), true))
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(m))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -103,7 +104,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
t.Run("after init", func(t *testing.T) {
|
t.Run("after init", func(t *testing.T) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
// Use-case: notmal node operation.
|
// Use-case: notmal node operation.
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(m))
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
|
@ -113,7 +114,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
||||||
// Use-case: normal node operation.
|
// Use-case: normal node operation.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(false))
|
require.NoError(t, s.Open(context.Background(), false))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.SetMode(from))
|
require.NoError(t, s.SetMode(from))
|
||||||
require.NoError(t, s.SetMode(to))
|
require.NoError(t, s.SetMode(to))
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -21,7 +22,7 @@ var ErrDegradedMode = logicerr.New("metabase is in a degraded mode")
|
||||||
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
var ErrReadOnlyMode = logicerr.New("metabase is in a read-only mode")
|
||||||
|
|
||||||
// Open boltDB instance for metabase.
|
// Open boltDB instance for metabase.
|
||||||
func (db *DB) Open(readOnly bool) error {
|
func (db *DB) Open(_ context.Context, readOnly bool) error {
|
||||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta_test
|
package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -49,7 +50,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB {
|
||||||
}, opts...)...,
|
}, opts...)...,
|
||||||
)
|
)
|
||||||
|
|
||||||
require.NoError(t, bdb.Open(false))
|
require.NoError(t, bdb.Open(context.Background(), false))
|
||||||
require.NoError(t, bdb.Init())
|
require.NoError(t, bdb.Init())
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -27,9 +28,9 @@ func (db *DB) SetMode(m mode.Mode) error {
|
||||||
case m.NoMetabase():
|
case m.NoMetabase():
|
||||||
db.boltDB = nil
|
db.boltDB = nil
|
||||||
case m.ReadOnly():
|
case m.ReadOnly():
|
||||||
err = db.Open(true)
|
err = db.Open(context.TODO(), true)
|
||||||
fyrchik marked this conversation as resolved
Outdated
dstepanov-yadro
commented
context.TODO too context.TODO too
elebedeva
commented
fixed fixed
|
|||||||
default:
|
default:
|
||||||
err = db.Open(false)
|
err = db.Open(context.TODO(), false)
|
||||||
}
|
}
|
||||||
if err == nil && !m.NoMetabase() && !m.ReadOnly() {
|
if err == nil && !m.NoMetabase() && !m.ReadOnly() {
|
||||||
err = db.Init()
|
err = db.Init()
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -42,13 +43,13 @@ func TestVersion(t *testing.T) {
|
||||||
}
|
}
|
||||||
t.Run("simple", func(t *testing.T) {
|
t.Run("simple", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
check(t, db)
|
check(t, db)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
t.Run("reopen", func(t *testing.T) {
|
t.Run("reopen", func(t *testing.T) {
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
check(t, db)
|
check(t, db)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
@ -56,29 +57,29 @@ func TestVersion(t *testing.T) {
|
||||||
})
|
})
|
||||||
t.Run("old data", func(t *testing.T) {
|
t.Run("old data", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
check(t, db)
|
check(t, db)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
})
|
})
|
||||||
t.Run("invalid version", func(t *testing.T) {
|
t.Run("invalid version", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
return updateVersion(tx, version+1)
|
return updateVersion(tx, version+1)
|
||||||
}))
|
}))
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.Error(t, db.Init())
|
require.Error(t, db.Init())
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
t.Run("reset", func(t *testing.T) {
|
t.Run("reset", func(t *testing.T) {
|
||||||
require.NoError(t, db.Open(false))
|
require.NoError(t, db.Open(context.Background(), false))
|
||||||
require.NoError(t, db.Reset())
|
require.NoError(t, db.Reset())
|
||||||
check(t, db)
|
check(t, db)
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
|
@ -26,7 +26,7 @@ func BenchmarkCreate(b *testing.B) {
|
||||||
f := NewBoltForest(
|
f := NewBoltForest(
|
||||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||||
WithMaxBatchSize(runtime.GOMAXPROCS(0)))
|
WithMaxBatchSize(runtime.GOMAXPROCS(0)))
|
||||||
require.NoError(b, f.Open(false))
|
require.NoError(b, f.Open(context.Background(), false))
|
||||||
require.NoError(b, f.Init())
|
require.NoError(b, f.Init())
|
||||||
b.Cleanup(func() {
|
b.Cleanup(func() {
|
||||||
require.NoError(b, f.Close())
|
require.NoError(b, f.Close())
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
||||||
|
|
||||||
err := t.Close()
|
err := t.Close()
|
||||||
if err == nil && !m.NoMetabase() {
|
if err == nil && !m.NoMetabase() {
|
||||||
if err = t.Open(m.ReadOnly()); err == nil {
|
if err = t.Open(context.TODO(), m.ReadOnly()); err == nil {
|
||||||
err = t.Init()
|
err = t.Init()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
||||||
t.metrics.SetMode(m)
|
t.metrics.SetMode(m)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (t *boltForest) Open(readOnly bool) error {
|
func (t *boltForest) Open(_ context.Context, readOnly bool) error {
|
||||||
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
return metaerr.Wrap(fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err))
|
||||||
|
|
|
@ -110,7 +110,7 @@ func (f *memoryForest) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *memoryForest) Open(bool) error {
|
func (f *memoryForest) Open(context.Context, bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (f *memoryForest) SetMode(mode.Mode) error {
|
func (f *memoryForest) SetMode(mode.Mode) error {
|
||||||
|
|
|
@ -24,7 +24,7 @@ var providers = []struct {
|
||||||
}{
|
}{
|
||||||
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
||||||
f := NewMemoryForest()
|
f := NewMemoryForest()
|
||||||
require.NoError(t, f.Open(false))
|
require.NoError(t, f.Open(context.Background(), false))
|
||||||
require.NoError(t, f.Init())
|
require.NoError(t, f.Init())
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
|
@ -37,7 +37,7 @@ var providers = []struct {
|
||||||
append([]Option{
|
append([]Option{
|
||||||
WithPath(filepath.Join(t.TempDir(), "test.db")),
|
WithPath(filepath.Join(t.TempDir(), "test.db")),
|
||||||
WithMaxBatchSize(1)}, opts...)...)
|
WithMaxBatchSize(1)}, opts...)...)
|
||||||
require.NoError(t, f.Open(false))
|
require.NoError(t, f.Open(context.Background(), false))
|
||||||
require.NoError(t, f.Init())
|
require.NoError(t, f.Init())
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
|
|
|
@ -58,7 +58,7 @@ type ForestStorage interface {
|
||||||
// DumpInfo returns information about the pilorama.
|
// DumpInfo returns information about the pilorama.
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
Init() error
|
Init() error
|
||||||
Open(bool) error
|
Open(context.Context, bool) error
|
||||||
Close() error
|
Close() error
|
||||||
SetMode(m mode.Mode) error
|
SetMode(m mode.Mode) error
|
||||||
SetParentID(id string)
|
SetParentID(id string)
|
||||||
|
|
|
@ -41,8 +41,10 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens all Shard's components.
|
// Open opens all Shard's components.
|
||||||
func (s *Shard) Open() error {
|
func (s *Shard) Open(ctx context.Context) error {
|
||||||
components := []interface{ Open(bool) error }{
|
components := []interface {
|
||||||
|
Open(context.Context, bool) error
|
||||||
|
}{
|
||||||
s.blobStor, s.metaBase,
|
s.blobStor, s.metaBase,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,12 +57,12 @@ func (s *Shard) Open() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, component := range components {
|
for i, component := range components {
|
||||||
if err := component.Open(false); err != nil {
|
if err := component.Open(ctx, false); err != nil {
|
||||||
if component == s.metaBase {
|
if component == s.metaBase {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What is this select for? We have an error, no need to check the context. For the What is this select for? We have an error, no need to check the context. For the `component == s.metaBase` check it is already checked in the loop.
elebedeva
commented
Removed unnecessary select Removed unnecessary select
|
|||||||
// We must first open all other components to avoid
|
// We must first open all other components to avoid
|
||||||
// opening non-existent DB in read-only mode.
|
// opening non-existent DB in read-only mode.
|
||||||
for j := i + 1; j < len(components); j++ {
|
for j := i + 1; j < len(components); j++ {
|
||||||
if err := components[j].Open(false); err != nil {
|
if err := components[j].Open(ctx, false); err != nil {
|
||||||
// Other components must be opened, fail.
|
// Other components must be opened, fail.
|
||||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,7 +87,7 @@ func TestShardOpen(t *testing.T) {
|
||||||
allowedMode.Store(int64(os.O_RDWR))
|
allowedMode.Store(int64(os.O_RDWR))
|
||||||
|
|
||||||
sh := newShard()
|
sh := newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
@ -96,7 +96,7 @@ func TestShardOpen(t *testing.T) {
|
||||||
allowedMode.Store(int64(os.O_RDONLY))
|
allowedMode.Store(int64(os.O_RDONLY))
|
||||||
|
|
||||||
sh = newShard()
|
sh = newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||||
require.Error(t, sh.SetMode(mode.ReadWrite))
|
require.Error(t, sh.SetMode(mode.ReadWrite))
|
||||||
|
@ -107,7 +107,7 @@ func TestShardOpen(t *testing.T) {
|
||||||
allowedMode.Store(math.MaxInt64)
|
allowedMode.Store(math.MaxInt64)
|
||||||
|
|
||||||
sh = newShard()
|
sh = newShard()
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
@ -135,7 +135,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
WithBlobStorOptions(blobOpts...),
|
WithBlobStorOptions(blobOpts...),
|
||||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
obj := objecttest.Object()
|
obj := objecttest.Object()
|
||||||
|
@ -158,7 +158,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||||
WithRefillMetabase(true))
|
WithRefillMetabase(true))
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
|
@ -197,7 +197,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
// open Blobstor
|
// open Blobstor
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
|
|
||||||
// initialize Blobstor
|
// initialize Blobstor
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
@ -365,7 +365,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
// open Blobstor
|
// open Blobstor
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
|
|
||||||
// initialize Blobstor
|
// initialize Blobstor
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
|
@ -76,7 +76,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
|
|
||||||
sh = New(opts...)
|
sh = New(opts...)
|
||||||
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
|
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package shard
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -27,8 +29,8 @@ func (s *Shard) ID() *ID {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||||
func (s *Shard) UpdateID() (err error) {
|
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
if err = s.metaBase.Open(false); err != nil {
|
if err = s.metaBase.Open(ctx, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sh = New(opts...)
|
sh = New(opts...)
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|
|
@ -269,7 +269,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
||||||
meta.WithEpochState(epochState{})),
|
meta.WithEpochState(epochState{})),
|
||||||
WithMetricsWriter(mm),
|
WithMetricsWriter(mm),
|
||||||
)
|
)
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
|
|
|
@ -52,7 +52,7 @@ func TestShardReload(t *testing.T) {
|
||||||
pilorama.WithPath(filepath.Join(p, "pilorama")))}
|
pilorama.WithPath(filepath.Join(p, "pilorama")))}
|
||||||
|
|
||||||
sh := New(opts...)
|
sh := New(opts...)
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
objects := make([]objAddr, 5)
|
objects := make([]objAddr, 5)
|
||||||
|
|
|
@ -117,7 +117,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
||||||
|
|
||||||
sh = New(opts...)
|
sh = New(opts...)
|
||||||
|
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
if !o.dontRelease {
|
if !o.dontRelease {
|
||||||
|
|
|
@ -82,7 +82,7 @@ func benchmarkPutPar(b *testing.B, cache writecache.Cache, size uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
func benchmarkPutPrepare(b *testing.B, cache writecache.Cache) {
|
||||||
require.NoError(b, cache.Open(false), "opening")
|
require.NoError(b, cache.Open(context.Background(), false), "opening")
|
||||||
require.NoError(b, cache.Init(), "initializing")
|
require.NoError(b, cache.Init(), "initializing")
|
||||||
b.Cleanup(func() {
|
b.Cleanup(func() {
|
||||||
require.NoError(b, cache.Close(), "closing")
|
require.NoError(b, cache.Close(), "closing")
|
||||||
|
|
|
@ -38,7 +38,7 @@ type Cache interface {
|
||||||
Flush(context.Context, bool) error
|
Flush(context.Context, bool) error
|
||||||
|
|
||||||
Init() error
|
Init() error
|
||||||
Open(readOnly bool) error
|
Open(ctx context.Context, readOnly bool) error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package writecachebadger
|
package writecachebadger
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -83,7 +84,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||||
func (c *cache) Open(readOnly bool) error {
|
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
err := c.openStore(readOnly)
|
err := c.openStore(readOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package writecachebbolt
|
package writecachebbolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -97,7 +98,7 @@ func (c *cache) DumpInfo() writecache.Info {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||||
func (c *cache) Open(readOnly bool) error {
|
func (c *cache) Open(_ context.Context, readOnly bool) error {
|
||||||
err := c.openStore(readOnly)
|
err := c.openStore(readOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
|
|
|
@ -110,7 +110,7 @@ func newCache[Option any](
|
||||||
mb := meta.New(
|
mb := meta.New(
|
||||||
meta.WithPath(filepath.Join(dir, "meta")),
|
meta.WithPath(filepath.Join(dir, "meta")),
|
||||||
meta.WithEpochState(dummyEpoch{}))
|
meta.WithEpochState(dummyEpoch{}))
|
||||||
require.NoError(t, mb.Open(false))
|
require.NoError(t, mb.Open(context.Background(), false))
|
||||||
require.NoError(t, mb.Init())
|
require.NoError(t, mb.Init())
|
||||||
|
|
||||||
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
|
@ -121,12 +121,12 @@ func newCache[Option any](
|
||||||
fstree.WithDirNameLen(1)),
|
fstree.WithDirNameLen(1)),
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
require.NoError(t, bs.Open(false))
|
require.NoError(t, bs.Open(context.Background(), false))
|
||||||
require.NoError(t, bs.Init())
|
require.NoError(t, bs.Init())
|
||||||
|
|
||||||
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
wc := createCacheFn(t, smallSize, mb, bs, opts...)
|
||||||
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
t.Cleanup(func() { require.NoError(t, wc.Close()) })
|
||||||
require.NoError(t, wc.Open(false))
|
require.NoError(t, wc.Open(context.Background(), false))
|
||||||
require.NoError(t, wc.Init())
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
// First set mode for metabase and blobstor to prevent background flushes.
|
// First set mode for metabase and blobstor to prevent background flushes.
|
||||||
|
|
Loading…
Add table
Reference in a new issue
cmd.Context()
is what we usually use for this.Fixed.