forked from TrueCloudLab/frostfs-node
[#1437] shard: Fix contextcheck linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9565ed40ad
commit
c0490fd38a
38 changed files with 165 additions and 160 deletions
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetMode sets the blobstor mode of operation.
|
// SetMode sets the blobstor mode of operation.
|
||||||
func (b *BlobStor) SetMode(m mode.Mode) error {
|
func (b *BlobStor) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
b.modeMtx.Lock()
|
b.modeMtx.Lock()
|
||||||
defer b.modeMtx.Unlock()
|
defer b.modeMtx.Unlock()
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ func (b *BlobStor) SetMode(m mode.Mode) error {
|
||||||
|
|
||||||
err := b.Close()
|
err := b.Close()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if err = b.openBlobStor(context.TODO(), m); err == nil {
|
if err = b.openBlobStor(ctx, m); err == nil {
|
||||||
err = b.Init()
|
err = b.Init()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ func (e *StorageEngine) open(ctx context.Context) error {
|
||||||
sh := e.shards[res.id]
|
sh := e.shards[res.id]
|
||||||
delete(e.shards, res.id)
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
err := sh.Close()
|
err := sh.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
||||||
zap.String("id", res.id),
|
zap.String("id", res.id),
|
||||||
|
@ -108,7 +108,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
sh := e.shards[res.id]
|
sh := e.shards[res.id]
|
||||||
delete(e.shards, res.id)
|
delete(e.shards, res.id)
|
||||||
|
|
||||||
err := sh.Close()
|
err := sh.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
e.log.Error(ctx, logs.EngineCouldNotClosePartiallyInitializedShard,
|
||||||
zap.String("id", res.id),
|
zap.String("id", res.id),
|
||||||
|
@ -126,7 +126,7 @@ func (e *StorageEngine) Init(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
e.wg.Add(1)
|
e.wg.Add(1)
|
||||||
go e.setModeLoop()
|
go e.setModeLoop(ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -153,7 +153,7 @@ func (e *StorageEngine) Close(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// closes all shards. Never returns an error, shard errors are logged.
|
// closes all shards. Never returns an error, shard errors are logged.
|
||||||
func (e *StorageEngine) close(releasePools bool) error {
|
func (e *StorageEngine) close(ctx context.Context, releasePools bool) error {
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -164,8 +164,8 @@ func (e *StorageEngine) close(releasePools bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
if err := sh.Close(); err != nil {
|
if err := sh.Close(ctx); err != nil {
|
||||||
e.log.Debug(context.Background(), logs.EngineCouldNotCloseShard,
|
e.log.Debug(ctx, logs.EngineCouldNotCloseShard,
|
||||||
zap.String("id", id),
|
zap.String("id", id),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
@ -213,7 +213,7 @@ func (e *StorageEngine) setBlockExecErr(ctx context.Context, err error) error {
|
||||||
return e.open(ctx)
|
return e.open(ctx)
|
||||||
}
|
}
|
||||||
} else if prevErr == nil { // ok -> block
|
} else if prevErr == nil { // ok -> block
|
||||||
return e.close(errors.Is(err, errClosed))
|
return e.close(ctx, errors.Is(err, errClosed))
|
||||||
}
|
}
|
||||||
|
|
||||||
// otherwise do nothing
|
// otherwise do nothing
|
||||||
|
@ -306,7 +306,7 @@ loop:
|
||||||
|
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
e.removeShards(shardsToRemove...)
|
e.removeShards(ctx, shardsToRemove...)
|
||||||
|
|
||||||
for _, p := range shardsToReload {
|
for _, p := range shardsToReload {
|
||||||
err := p.sh.Reload(ctx, p.opts...)
|
err := p.sh.Reload(ctx, p.opts...)
|
||||||
|
@ -330,13 +330,13 @@ loop:
|
||||||
err = sh.Init(ctx)
|
err = sh.Init(ctx)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close()
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
return fmt.Errorf("could not init %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.addShard(sh)
|
err = e.addShard(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = sh.Close()
|
_ = sh.Close(ctx)
|
||||||
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
return fmt.Errorf("could not add %s shard: %w", idStr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ type setModeRequest struct {
|
||||||
|
|
||||||
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
|
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
|
||||||
// Instead of creating a worker per single shard we use a single goroutine.
|
// Instead of creating a worker per single shard we use a single goroutine.
|
||||||
func (e *StorageEngine) setModeLoop() {
|
func (e *StorageEngine) setModeLoop(ctx context.Context) {
|
||||||
defer e.wg.Done()
|
defer e.wg.Done()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -75,7 +75,7 @@ func (e *StorageEngine) setModeLoop() {
|
||||||
if !ok {
|
if !ok {
|
||||||
inProgress[sid] = struct{}{}
|
inProgress[sid] = struct{}{}
|
||||||
go func() {
|
go func() {
|
||||||
e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
|
e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta)
|
||||||
|
|
||||||
mtx.Lock()
|
mtx.Lock()
|
||||||
delete(inProgress, sid)
|
delete(inProgress, sid)
|
||||||
|
@ -87,7 +87,7 @@ func (e *StorageEngine) setModeLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
|
func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) {
|
||||||
sid := sh.ID()
|
sid := sh.ID()
|
||||||
log := e.log.With(
|
log := e.log.With(
|
||||||
zap.Stringer("shard_id", sid),
|
zap.Stringer("shard_id", sid),
|
||||||
|
@ -97,7 +97,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
if isMeta {
|
if isMeta {
|
||||||
err := sh.SetMode(mode.DegradedReadOnly)
|
err := sh.SetMode(ctx, mode.DegradedReadOnly)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
|
log.Info(context.Background(), logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
|
||||||
return
|
return
|
||||||
|
@ -106,7 +106,7 @@ func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sh.SetMode(mode.ReadOnly)
|
err := sh.SetMode(ctx, mode.ReadOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
|
log.Error(context.Background(), logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
|
||||||
return
|
return
|
||||||
|
|
|
@ -158,10 +158,10 @@ func TestErrorReporting(t *testing.T) {
|
||||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
|
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, false))
|
||||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
||||||
|
|
||||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
|
require.NoError(t, te.ng.SetShardMode(context.Background(), te.shards[0].id, mode.ReadWrite, true))
|
||||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||||
require.NoError(t, te.ng.Close(context.Background()))
|
require.NoError(t, te.ng.Close(context.Background()))
|
||||||
})
|
})
|
||||||
|
|
|
@ -146,7 +146,7 @@ func TestEvacuateShardObjects(t *testing.T) {
|
||||||
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
res, err := e.Evacuate(context.Background(), prm)
|
res, err := e.Evacuate(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -237,7 +237,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
|
|
||||||
evacuateShardID := ids[0].String()
|
evacuateShardID := ids[0].String()
|
||||||
|
|
||||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[evacuateShardID].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[0:1]
|
prm.ShardID = ids[0:1]
|
||||||
|
@ -260,8 +260,8 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
|
@ -298,7 +298,7 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range ids {
|
for i := range ids {
|
||||||
require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[i].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
|
@ -327,8 +327,8 @@ func TestEvacuateCancellation(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
|
@ -357,8 +357,8 @@ func TestEvacuateCancellationByError(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[1:2]
|
prm.ShardID = ids[1:2]
|
||||||
|
@ -386,8 +386,8 @@ func TestEvacuateSingleProcess(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
blocker := make(chan interface{})
|
blocker := make(chan interface{})
|
||||||
running := make(chan interface{})
|
running := make(chan interface{})
|
||||||
|
@ -429,8 +429,8 @@ func TestEvacuateObjectsAsync(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
blocker := make(chan interface{})
|
blocker := make(chan interface{})
|
||||||
running := make(chan interface{})
|
running := make(chan interface{})
|
||||||
|
@ -515,7 +515,7 @@ func TestEvacuateTreesLocal(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids[0:1]
|
prm.ShardID = ids[0:1]
|
||||||
|
@ -594,8 +594,8 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
mutex := sync.Mutex{}
|
mutex := sync.Mutex{}
|
||||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||||
|
@ -753,7 +753,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
|
||||||
prm.Scope = EvacuateScopeObjects
|
prm.Scope = EvacuateScopeObjects
|
||||||
prm.RepOneOnly = true
|
prm.RepOneOnly = true
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
res, err := e.Evacuate(context.Background(), prm)
|
res, err := e.Evacuate(context.Background(), prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -810,7 +810,7 @@ func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
|
||||||
prm.RepOneOnly = true
|
prm.RepOneOnly = true
|
||||||
prm.ContainerWorkerCount = 10
|
prm.ContainerWorkerCount = 10
|
||||||
|
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err := e.Evacuate(context.Background(), prm)
|
_, err := e.Evacuate(context.Background(), prm)
|
||||||
|
|
|
@ -121,7 +121,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
|
||||||
return sh.ID(), nil
|
return sh.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*shard.Shard, error) {
|
func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*shard.Shard, error) {
|
||||||
id, err := generateShardID()
|
id, err := generateShardID()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||||
|
@ -139,8 +139,8 @@ func (e *StorageEngine) createShard(_ context.Context, opts []shard.Option) (*sh
|
||||||
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
if err := sh.UpdateID(); err != nil {
|
if err := sh.UpdateID(ctx); err != nil {
|
||||||
e.log.Warn(context.Background(), logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
e.log.Warn(ctx, logs.FailedToUpdateShardID, zap.Stringer("shard_id", sh.ID()), zap.String("metabase_path", sh.DumpInfo().MetaBaseInfo.Path), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return sh, nil
|
return sh, nil
|
||||||
|
@ -203,7 +203,7 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||||
|
|
||||||
// removeShards removes specified shards. Skips non-existent shards.
|
// removeShards removes specified shards. Skips non-existent shards.
|
||||||
// Logs errors about shards that it could not Close after the removal.
|
// Logs errors about shards that it could not Close after the removal.
|
||||||
func (e *StorageEngine) removeShards(ids ...string) {
|
func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
|
||||||
if len(ids) == 0 {
|
if len(ids) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -228,22 +228,22 @@ func (e *StorageEngine) removeShards(ids ...string) {
|
||||||
delete(e.shardPools, id)
|
delete(e.shardPools, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Info(context.Background(), logs.EngineShardHasBeenRemoved,
|
e.log.Info(ctx, logs.EngineShardHasBeenRemoved,
|
||||||
zap.String("id", id))
|
zap.String("id", id))
|
||||||
}
|
}
|
||||||
e.mtx.Unlock()
|
e.mtx.Unlock()
|
||||||
|
|
||||||
for _, sh := range ss {
|
for _, sh := range ss {
|
||||||
err := sh.SetMode(mode.Disabled)
|
err := sh.SetMode(ctx, mode.Disabled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
|
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
|
||||||
zap.Stringer("id", sh.ID()),
|
zap.Stringer("id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
err = sh.Close()
|
err = sh.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
|
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
|
||||||
zap.Stringer("id", sh.ID()),
|
zap.Stringer("id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
@ -310,7 +310,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto
|
||||||
// SetShardMode sets mode of the shard with provided identifier.
|
// SetShardMode sets mode of the shard with provided identifier.
|
||||||
//
|
//
|
||||||
// Returns an error if shard mode was not set, or shard was not found in storage engine.
|
// Returns an error if shard mode was not set, or shard was not found in storage engine.
|
||||||
func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
|
func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
|
||||||
sh.errorCount.Store(0)
|
sh.errorCount.Store(0)
|
||||||
e.metrics.ClearErrorCounter(shID)
|
e.metrics.ClearErrorCounter(shID)
|
||||||
}
|
}
|
||||||
return sh.SetMode(m)
|
return sh.SetMode(ctx, m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,7 +346,7 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
func (e *StorageEngine) DetachShards(ctx context.Context, ids []*shard.ID) error {
|
||||||
if len(ids) == 0 {
|
if len(ids) == 0 {
|
||||||
return logicerr.New("ids must be non-empty")
|
return logicerr.New("ids must be non-empty")
|
||||||
}
|
}
|
||||||
|
@ -356,20 +356,20 @@ func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.closeShards(deletedShards)
|
return e.closeShards(ctx, deletedShards)
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeShards closes deleted shards. Tries to close all shards.
|
// closeShards closes deleted shards. Tries to close all shards.
|
||||||
// Returns single error with joined shard errors.
|
// Returns single error with joined shard errors.
|
||||||
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
func (e *StorageEngine) closeShards(ctx context.Context, deletedShards []hashedShard) error {
|
||||||
var multiErr error
|
var multiErr error
|
||||||
var multiErrGuard sync.Mutex
|
var multiErrGuard sync.Mutex
|
||||||
var eg errgroup.Group
|
var eg errgroup.Group
|
||||||
for _, sh := range deletedShards {
|
for _, sh := range deletedShards {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
err := sh.SetMode(mode.Disabled)
|
err := sh.SetMode(ctx, mode.Disabled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(context.Background(), logs.EngineCouldNotChangeShardModeToDisabled,
|
e.log.Error(ctx, logs.EngineCouldNotChangeShardModeToDisabled,
|
||||||
zap.Stringer("id", sh.ID()),
|
zap.Stringer("id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
@ -378,9 +378,9 @@ func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
||||||
multiErrGuard.Unlock()
|
multiErrGuard.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sh.Close()
|
err = sh.Close(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Error(context.Background(), logs.EngineCouldNotCloseRemovedShard,
|
e.log.Error(ctx, logs.EngineCouldNotCloseRemovedShard,
|
||||||
zap.Stringer("id", sh.ID()),
|
zap.Stringer("id", sh.ID()),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
|
@ -33,7 +33,7 @@ func TestRemoveShard(t *testing.T) {
|
||||||
|
|
||||||
for id, remove := range mSh {
|
for id, remove := range mSh {
|
||||||
if remove {
|
if remove {
|
||||||
e.removeShards(id)
|
e.removeShards(context.Background(), id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,11 +55,11 @@ func TestDisableShards(t *testing.T) {
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
|
require.ErrorAs(t, e.DetachShards(context.Background(), ids), new(logicerr.Logical))
|
||||||
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
|
require.ErrorAs(t, e.DetachShards(context.Background(), nil), new(logicerr.Logical))
|
||||||
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
|
require.ErrorAs(t, e.DetachShards(context.Background(), []*shard.ID{}), new(logicerr.Logical))
|
||||||
|
|
||||||
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
|
require.NoError(t, e.DetachShards(context.Background(), []*shard.ID{ids[0]}))
|
||||||
|
|
||||||
require.Equal(t, 1, len(e.shards))
|
require.Equal(t, 1, len(e.shards))
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
// Component represents single storage component.
|
// Component represents single storage component.
|
||||||
type Component interface {
|
type Component interface {
|
||||||
Open(context.Context, mode.Mode) error
|
Open(context.Context, mode.Mode) error
|
||||||
SetMode(mode.Mode) error
|
SetMode(context.Context, mode.Mode) error
|
||||||
Init() error
|
Init() error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
@ -91,12 +91,12 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
// call `SetMode` on all not-yet-initialized components.
|
// call `SetMode` on all not-yet-initialized components.
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(context.Background(), m))
|
||||||
|
|
||||||
t.Run("after open in RO", func(t *testing.T) {
|
t.Run("after open in RO", func(t *testing.T) {
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
require.NoError(t, s.Open(context.Background(), mode.ReadOnly))
|
require.NoError(t, s.Open(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(context.Background(), m))
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
|
@ -106,7 +106,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) {
|
||||||
// Use-case: notmal node operation.
|
// Use-case: notmal node operation.
|
||||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.SetMode(m))
|
require.NoError(t, s.SetMode(context.Background(), m))
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) {
|
||||||
s := cons(t)
|
s := cons(t)
|
||||||
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, s.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, s.Init())
|
require.NoError(t, s.Init())
|
||||||
require.NoError(t, s.SetMode(from))
|
require.NoError(t, s.SetMode(context.Background(), from))
|
||||||
require.NoError(t, s.SetMode(to))
|
require.NoError(t, s.SetMode(context.Background(), to))
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Open boltDB instance for metabase.
|
// Open boltDB instance for metabase.
|
||||||
func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
func (db *DB) Open(ctx context.Context, m mode.Mode) error {
|
||||||
db.modeMtx.Lock()
|
db.modeMtx.Lock()
|
||||||
defer db.modeMtx.Unlock()
|
defer db.modeMtx.Unlock()
|
||||||
db.mode = m
|
db.mode = m
|
||||||
|
@ -48,10 +48,10 @@ func (db *DB) Open(_ context.Context, m mode.Mode) error {
|
||||||
if m.NoMetabase() {
|
if m.NoMetabase() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return db.openDB(m)
|
return db.openDB(ctx, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) openDB(mode mode.Mode) error {
|
func (db *DB) openDB(ctx context.Context, mode mode.Mode) error {
|
||||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
|
||||||
|
@ -65,10 +65,10 @@ func (db *DB) openDB(mode mode.Mode) error {
|
||||||
}
|
}
|
||||||
db.boltOptions.ReadOnly = mode.ReadOnly()
|
db.boltOptions.ReadOnly = mode.ReadOnly()
|
||||||
|
|
||||||
return metaerr.Wrap(db.openBolt())
|
return metaerr.Wrap(db.openBolt(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) openBolt() error {
|
func (db *DB) openBolt(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||||
|
@ -226,7 +226,7 @@ func (db *DB) close() error {
|
||||||
// If there was a problem with applying new configuration, an error is returned.
|
// If there was a problem with applying new configuration, an error is returned.
|
||||||
//
|
//
|
||||||
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
|
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
|
||||||
func (db *DB) Reload(opts ...Option) (bool, error) {
|
func (db *DB) Reload(ctx context.Context, opts ...Option) (bool, error) {
|
||||||
var c cfg
|
var c cfg
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](&c)
|
opts[i](&c)
|
||||||
|
@ -243,7 +243,7 @@ func (db *DB) Reload(opts ...Option) (bool, error) {
|
||||||
db.mode = mode.Disabled
|
db.mode = mode.Disabled
|
||||||
db.metrics.SetMode(mode.ComponentDisabled)
|
db.metrics.SetMode(mode.ComponentDisabled)
|
||||||
db.info.Path = c.info.Path
|
db.info.Path = c.info.Path
|
||||||
if err := db.openBolt(); err != nil {
|
if err := db.openBolt(ctx); err != nil {
|
||||||
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
|
return false, metaerr.Wrap(fmt.Errorf("%w: %v", ErrDegradedMode, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -8,7 +9,7 @@ import (
|
||||||
|
|
||||||
// SetMode sets the metabase mode of operation.
|
// SetMode sets the metabase mode of operation.
|
||||||
// If the mode assumes no operation metabase, the database is closed.
|
// If the mode assumes no operation metabase, the database is closed.
|
||||||
func (db *DB) SetMode(m mode.Mode) error {
|
func (db *DB) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
db.modeMtx.Lock()
|
db.modeMtx.Lock()
|
||||||
defer db.modeMtx.Unlock()
|
defer db.modeMtx.Unlock()
|
||||||
|
|
||||||
|
@ -25,7 +26,7 @@ func (db *DB) SetMode(m mode.Mode) error {
|
||||||
if m.NoMetabase() {
|
if m.NoMetabase() {
|
||||||
db.boltDB = nil
|
db.boltDB = nil
|
||||||
} else {
|
} else {
|
||||||
err := db.openDB(m)
|
err := db.openDB(ctx, m)
|
||||||
if err == nil && !m.ReadOnly() {
|
if err == nil && !m.ReadOnly() {
|
||||||
err = db.Init()
|
err = db.Init()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
@ -21,7 +22,7 @@ var (
|
||||||
// If id is missing, returns nil, nil.
|
// If id is missing, returns nil, nil.
|
||||||
//
|
//
|
||||||
// GetShardID does not report any metrics.
|
// GetShardID does not report any metrics.
|
||||||
func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
func (db *DB) GetShardID(ctx context.Context, mode metamode.Mode) ([]byte, error) {
|
||||||
db.modeMtx.Lock()
|
db.modeMtx.Lock()
|
||||||
defer db.modeMtx.Unlock()
|
defer db.modeMtx.Unlock()
|
||||||
db.mode = mode
|
db.mode = mode
|
||||||
|
@ -30,7 +31,7 @@ func (db *DB) GetShardID(mode metamode.Mode) ([]byte, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
return nil, fmt.Errorf("failed to open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,7 +60,7 @@ func (db *DB) readShardID() ([]byte, error) {
|
||||||
|
|
||||||
// SetShardID sets metabase operation mode
|
// SetShardID sets metabase operation mode
|
||||||
// and writes shard id to db.
|
// and writes shard id to db.
|
||||||
func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
func (db *DB) SetShardID(ctx context.Context, id []byte, mode metamode.Mode) error {
|
||||||
db.modeMtx.Lock()
|
db.modeMtx.Lock()
|
||||||
defer db.modeMtx.Unlock()
|
defer db.modeMtx.Unlock()
|
||||||
db.mode = mode
|
db.mode = mode
|
||||||
|
@ -68,7 +69,7 @@ func (db *DB) SetShardID(id []byte, mode metamode.Mode) error {
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.openDB(mode); err != nil {
|
if err := db.openDB(ctx, mode); err != nil {
|
||||||
return fmt.Errorf("failed to open metabase: %w", err)
|
return fmt.Errorf("failed to open metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestVersion(t *testing.T) {
|
||||||
})
|
})
|
||||||
t.Run("old data", func(t *testing.T) {
|
t.Run("old data", func(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
require.NoError(t, db.SetShardID([]byte{1, 2, 3, 4}, mode.ReadWrite))
|
require.NoError(t, db.SetShardID(context.Background(), []byte{1, 2, 3, 4}, mode.ReadWrite))
|
||||||
|
|
||||||
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, db.Init())
|
require.NoError(t, db.Init())
|
||||||
|
|
|
@ -91,7 +91,7 @@ func NewBoltForest(opts ...Option) ForestStorage {
|
||||||
return &b
|
return &b
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *boltForest) SetMode(m mode.Mode) error {
|
func (t *boltForest) SetMode(_ context.Context, m mode.Mode) error {
|
||||||
t.modeMtx.Lock()
|
t.modeMtx.Lock()
|
||||||
defer t.modeMtx.Unlock()
|
defer t.modeMtx.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ func (f *memoryForest) Open(context.Context, mode.Mode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *memoryForest) SetMode(mode.Mode) error {
|
func (f *memoryForest) SetMode(context.Context, mode.Mode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ type ForestStorage interface {
|
||||||
Init() error
|
Init() error
|
||||||
Open(context.Context, mode.Mode) error
|
Open(context.Context, mode.Mode) error
|
||||||
Close() error
|
Close() error
|
||||||
SetMode(m mode.Mode) error
|
SetMode(context.Context, mode.Mode) error
|
||||||
SetParentID(id string)
|
SetParentID(id string)
|
||||||
Forest
|
Forest
|
||||||
|
|
||||||
|
|
|
@ -20,23 +20,23 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
func (s *Shard) handleMetabaseFailure(ctx context.Context, stage string, err error) error {
|
||||||
s.log.Error(context.Background(), logs.ShardMetabaseFailureSwitchingMode,
|
s.log.Error(ctx, logs.ShardMetabaseFailureSwitchingMode,
|
||||||
zap.String("stage", stage),
|
zap.String("stage", stage),
|
||||||
zap.Stringer("mode", mode.ReadOnly),
|
zap.Stringer("mode", mode.ReadOnly),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
||||||
err = s.SetMode(mode.ReadOnly)
|
err = s.SetMode(ctx, mode.ReadOnly)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.log.Error(context.Background(), logs.ShardCantMoveShardToReadonlySwitchMode,
|
s.log.Error(ctx, logs.ShardCantMoveShardToReadonlySwitchMode,
|
||||||
zap.String("stage", stage),
|
zap.String("stage", stage),
|
||||||
zap.Stringer("mode", mode.DegradedReadOnly),
|
zap.Stringer("mode", mode.DegradedReadOnly),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
|
||||||
err = s.SetMode(mode.DegradedReadOnly)
|
err = s.SetMode(ctx, mode.DegradedReadOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ func (s *Shard) Open(ctx context.Context) error {
|
||||||
return fmt.Errorf("could not open %T: %w", components[j], err)
|
return fmt.Errorf("could not open %T: %w", components[j], err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = s.handleMetabaseFailure("open", err)
|
err = s.handleMetabaseFailure(ctx, "open", err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ func (x *metabaseSynchronizer) Init() error {
|
||||||
// Init initializes all Shard's components.
|
// Init initializes all Shard's components.
|
||||||
func (s *Shard) Init(ctx context.Context) error {
|
func (s *Shard) Init(ctx context.Context) error {
|
||||||
m := s.GetMode()
|
m := s.GetMode()
|
||||||
if err := s.initializeComponents(m); err != nil {
|
if err := s.initializeComponents(ctx, m); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) initializeComponents(m mode.Mode) error {
|
func (s *Shard) initializeComponents(ctx context.Context, m mode.Mode) error {
|
||||||
type initializer interface {
|
type initializer interface {
|
||||||
Init() error
|
Init() error
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ func (s *Shard) initializeComponents(m mode.Mode) error {
|
||||||
return fmt.Errorf("metabase initialization: %w", err)
|
return fmt.Errorf("metabase initialization: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.handleMetabaseFailure("init", err)
|
err = s.handleMetabaseFailure(ctx, "init", err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -364,9 +364,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close releases all Shard's components.
|
// Close releases all Shard's components.
|
||||||
func (s *Shard) Close() error {
|
func (s *Shard) Close(ctx context.Context) error {
|
||||||
if s.rb != nil {
|
if s.rb != nil {
|
||||||
s.rb.Stop(s.log)
|
s.rb.Stop(ctx, s.log)
|
||||||
}
|
}
|
||||||
var components []interface{ Close() error }
|
var components []interface{ Close() error }
|
||||||
|
|
||||||
|
@ -386,7 +386,7 @@ func (s *Shard) Close() error {
|
||||||
for _, component := range components {
|
for _, component := range components {
|
||||||
if err := component.Close(); err != nil {
|
if err := component.Close(); err != nil {
|
||||||
lastErr = err
|
lastErr = err
|
||||||
s.log.Error(context.Background(), logs.ShardCouldNotCloseShardComponent, zap.Error(err))
|
s.log.Error(ctx, logs.ShardCouldNotCloseShardComponent, zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,18 +414,18 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
||||||
unlock := s.lockExclusive()
|
unlock := s.lockExclusive()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
s.rb.Stop(s.log)
|
s.rb.Stop(ctx, s.log)
|
||||||
if !s.info.Mode.NoMetabase() {
|
if !s.info.Mode.NoMetabase() {
|
||||||
defer func() {
|
defer func() {
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
ok, err := s.metaBase.Reload(c.metaOpts...)
|
ok, err := s.metaBase.Reload(ctx, c.metaOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, meta.ErrDegradedMode) {
|
if errors.Is(err, meta.ErrDegradedMode) {
|
||||||
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
|
s.log.Error(ctx, logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
|
||||||
_ = s.setMode(mode.DegradedReadOnly)
|
_ = s.setMode(ctx, mode.DegradedReadOnly)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -441,11 +441,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
|
s.log.Error(ctx, logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
|
||||||
_ = s.setMode(mode.DegradedReadOnly)
|
_ = s.setMode(ctx, mode.DegradedReadOnly)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s.setMode(c.info.Mode)
|
return s.setMode(ctx, c.info.Mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) lockExclusive() func() {
|
func (s *Shard) lockExclusive() func() {
|
||||||
|
|
|
@ -86,7 +86,7 @@ func TestShardOpen(t *testing.T) {
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
require.Equal(t, mode.ReadWrite, sh.GetMode())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
|
|
||||||
// Metabase can be opened in read-only => start in ReadOnly mode.
|
// Metabase can be opened in read-only => start in ReadOnly mode.
|
||||||
allowedMode.Store(int64(os.O_RDONLY))
|
allowedMode.Store(int64(os.O_RDONLY))
|
||||||
|
@ -95,9 +95,9 @@ func TestShardOpen(t *testing.T) {
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||||
require.Error(t, sh.SetMode(mode.ReadWrite))
|
require.Error(t, sh.SetMode(context.Background(), mode.ReadWrite))
|
||||||
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
require.Equal(t, mode.ReadOnly, sh.GetMode())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
|
|
||||||
// Metabase is corrupted => start in DegradedReadOnly mode.
|
// Metabase is corrupted => start in DegradedReadOnly mode.
|
||||||
allowedMode.Store(math.MaxInt64)
|
allowedMode.Store(math.MaxInt64)
|
||||||
|
@ -106,7 +106,7 @@ func TestShardOpen(t *testing.T) {
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
require.Equal(t, mode.DegradedReadOnly, sh.GetMode())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRefillMetabaseCorrupted(t *testing.T) {
|
func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
|
@ -146,7 +146,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
|
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
|
// This is copied from `fstree.treePath()` to avoid exporting function just for tests.
|
||||||
|
@ -170,7 +170,7 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
getPrm.SetAddress(addr)
|
getPrm.SetAddress(addr)
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
require.True(t, client.IsErrObjectNotFound(err))
|
require.True(t, client.IsErrObjectNotFound(err))
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRefillMetabase(t *testing.T) {
|
func TestRefillMetabase(t *testing.T) {
|
||||||
|
@ -358,7 +358,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
phyBefore := c.Phy
|
phyBefore := c.Phy
|
||||||
logicalBefore := c.Logic
|
logicalBefore := c.Logic
|
||||||
|
|
||||||
err = sh.Close()
|
err = sh.Close(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
sh = New(
|
sh = New(
|
||||||
|
@ -379,7 +379,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
// initialize Blobstor
|
// initialize Blobstor
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
defer sh.Close()
|
defer sh.Close(context.Background())
|
||||||
|
|
||||||
checkAllObjs(false)
|
checkAllObjs(false)
|
||||||
checkObj(object.AddressOf(tombObj), nil)
|
checkObj(object.AddressOf(tombObj), nil)
|
||||||
|
|
|
@ -37,7 +37,7 @@ func TestShard_Delete_BigObject(t *testing.T) {
|
||||||
|
|
||||||
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
|
func testShard(t *testing.T, hasWriteCache bool, payloadSize int) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
sh = New(opts...)
|
sh = New(opts...)
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
|
@ -34,7 +34,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
return util.NewPseudoWorkerPool() // synchronous event processing
|
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||||
})},
|
})},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
return util.NewPseudoWorkerPool() // synchronous event processing
|
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||||
})},
|
})},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
lock := testutil.GenerateObjectWithCID(cnr)
|
lock := testutil.GenerateObjectWithCID(cnr)
|
||||||
lock.SetType(objectSDK.TypeLock)
|
lock.SetType(objectSDK.TypeLock)
|
||||||
|
@ -190,7 +190,7 @@ func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool
|
||||||
additionalShardOptions: []Option{WithDisabledGC()},
|
additionalShardOptions: []Option{WithDisabledGC()},
|
||||||
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
obj := testutil.GenerateObjectWithSize(1024)
|
obj := testutil.GenerateObjectWithSize(1024)
|
||||||
|
|
||||||
|
@ -254,7 +254,7 @@ func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
|
||||||
additionalShardOptions: []Option{WithDisabledGC()},
|
additionalShardOptions: []Option{WithDisabledGC()},
|
||||||
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
obj := testutil.GenerateObjectWithSize(1024)
|
obj := testutil.GenerateObjectWithSize(1024)
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ func TestShard_Get(t *testing.T) {
|
||||||
|
|
||||||
func testShardGet(t *testing.T, hasWriteCache bool) {
|
func testShardGet(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
var putPrm PutPrm
|
var putPrm PutPrm
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
|
|
|
@ -28,7 +28,7 @@ func TestShard_Head(t *testing.T) {
|
||||||
|
|
||||||
func testShardHead(t *testing.T, hasWriteCache bool) {
|
func testShardHead(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
var putPrm PutPrm
|
var putPrm PutPrm
|
||||||
var headPrm HeadPrm
|
var headPrm HeadPrm
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package shard
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -30,11 +31,11 @@ func (s *Shard) ID() *ID {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||||
func (s *Shard) UpdateID() (err error) {
|
func (s *Shard) UpdateID(ctx context.Context) (err error) {
|
||||||
var idFromMetabase []byte
|
var idFromMetabase []byte
|
||||||
modeDegraded := s.GetMode().NoMetabase()
|
modeDegraded := s.GetMode().NoMetabase()
|
||||||
if !modeDegraded {
|
if !modeDegraded {
|
||||||
if idFromMetabase, err = s.metaBase.GetShardID(mode.ReadOnly); err != nil {
|
if idFromMetabase, err = s.metaBase.GetShardID(ctx, mode.ReadOnly); err != nil {
|
||||||
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
err = fmt.Errorf("failed to read shard id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,7 +63,7 @@ func (s *Shard) UpdateID() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(idFromMetabase) == 0 && !modeDegraded {
|
if len(idFromMetabase) == 0 && !modeDegraded {
|
||||||
if setErr := s.metaBase.SetShardID(*s.info.ID, s.GetMode()); setErr != nil {
|
if setErr := s.metaBase.SetShardID(ctx, *s.info.ID, s.GetMode()); setErr != nil {
|
||||||
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
err = errors.Join(err, fmt.Errorf("failed to write shard id to metabase: %w", setErr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,7 @@ func TestShard_Inhume(t *testing.T) {
|
||||||
|
|
||||||
func testShardInhume(t *testing.T, hasWriteCache bool) {
|
func testShardInhume(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
|
|
@ -18,14 +18,14 @@ func TestShard_List(t *testing.T) {
|
||||||
t.Run("without write cache", func(t *testing.T) {
|
t.Run("without write cache", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
sh := newShard(t, false)
|
sh := newShard(t, false)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
testShardList(t, sh)
|
testShardList(t, sh)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with write cache", func(t *testing.T) {
|
t.Run("with write cache", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
shWC := newShard(t, true)
|
shWC := newShard(t, true)
|
||||||
defer func() { require.NoError(t, shWC.Close()) }()
|
defer func() { require.NoError(t, shWC.Close(context.Background())) }()
|
||||||
testShardList(t, shWC)
|
testShardList(t, shWC)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
@ -148,7 +148,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
func TestShard_IsLocked(t *testing.T) {
|
func TestShard_IsLocked(t *testing.T) {
|
||||||
sh := newShard(t, false)
|
sh := newShard(t, false)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
|
@ -201,11 +201,11 @@ func TestCounters(t *testing.T) {
|
||||||
|
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
sh, mm := shardWithMetrics(t, dir)
|
sh, mm := shardWithMetrics(t, dir)
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
sh.SetMode(mode.ReadOnly)
|
sh.SetMode(context.Background(), mode.ReadOnly)
|
||||||
require.Equal(t, mode.ReadOnly, mm.mode)
|
require.Equal(t, mode.ReadOnly, mm.mode)
|
||||||
sh.SetMode(mode.ReadWrite)
|
sh.SetMode(context.Background(), mode.ReadWrite)
|
||||||
require.Equal(t, mode.ReadWrite, mm.mode)
|
require.Equal(t, mode.ReadWrite, mm.mode)
|
||||||
|
|
||||||
const objNumber = 10
|
const objNumber = 10
|
||||||
|
|
|
@ -20,19 +20,21 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode")
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow
|
// Returns any error encountered that did not allow
|
||||||
// setting shard mode.
|
// setting shard mode.
|
||||||
func (s *Shard) SetMode(m mode.Mode) error {
|
func (s *Shard) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
unlock := s.lockExclusive()
|
unlock := s.lockExclusive()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
return s.setMode(m)
|
return s.setMode(ctx, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) setMode(m mode.Mode) error {
|
func (s *Shard) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
s.log.Info(context.Background(), logs.ShardSettingShardMode,
|
s.log.Info(ctx, logs.ShardSettingShardMode,
|
||||||
zap.Stringer("old_mode", s.info.Mode),
|
zap.Stringer("old_mode", s.info.Mode),
|
||||||
zap.Stringer("new_mode", m))
|
zap.Stringer("new_mode", m))
|
||||||
|
|
||||||
components := []interface{ SetMode(mode.Mode) error }{
|
components := []interface {
|
||||||
|
SetMode(context.Context, mode.Mode) error
|
||||||
|
}{
|
||||||
s.metaBase, s.blobStor,
|
s.metaBase, s.blobStor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +62,7 @@ func (s *Shard) setMode(m mode.Mode) error {
|
||||||
|
|
||||||
if !m.Disabled() {
|
if !m.Disabled() {
|
||||||
for i := range components {
|
for i := range components {
|
||||||
if err := components[i].SetMode(m); err != nil {
|
if err := components[i].SetMode(ctx, m); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,7 +71,7 @@ func (s *Shard) setMode(m mode.Mode) error {
|
||||||
s.info.Mode = m
|
s.info.Mode = m
|
||||||
s.metricsWriter.SetMode(s.info.Mode)
|
s.metricsWriter.SetMode(s.info.Mode)
|
||||||
|
|
||||||
s.log.Info(context.Background(), logs.ShardShardModeSetSuccessfully,
|
s.log.Info(ctx, logs.ShardShardModeSetSuccessfully,
|
||||||
zap.Stringer("mode", s.info.Mode))
|
zap.Stringer("mode", s.info.Mode))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rebuilder) Stop(log *logger.Logger) {
|
func (r *rebuilder) Stop(ctx context.Context, log *logger.Logger) {
|
||||||
r.mtx.Lock()
|
r.mtx.Lock()
|
||||||
defer r.mtx.Unlock()
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ func (r *rebuilder) Stop(log *logger.Logger) {
|
||||||
r.wg.Wait()
|
r.wg.Wait()
|
||||||
r.cancel = nil
|
r.cancel = nil
|
||||||
r.done = nil
|
r.done = nil
|
||||||
log.Info(context.Background(), logs.BlobstoreRebuildStopped)
|
log.Info(ctx, logs.BlobstoreRebuildStopped)
|
||||||
}
|
}
|
||||||
|
|
||||||
var errMBIsNotAvailable = errors.New("metabase is not available")
|
var errMBIsNotAvailable = errors.New("metabase is not available")
|
||||||
|
|
|
@ -34,7 +34,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||||
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
||||||
})
|
})
|
||||||
|
|
||||||
defer func() { require.NoError(b, sh.Close()) }()
|
defer func() { require.NoError(b, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
var putPrm PutPrm
|
var putPrm PutPrm
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(b, sh.Close())
|
require.NoError(b, sh.Close(context.Background()))
|
||||||
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
|
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
|
||||||
|
|
||||||
require.NoError(b, sh.Open(context.Background()))
|
require.NoError(b, sh.Open(context.Background()))
|
||||||
|
@ -72,5 +72,5 @@ func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||||
|
|
||||||
require.NoError(b, sh.Init(context.Background()))
|
require.NoError(b, sh.Init(context.Background()))
|
||||||
|
|
||||||
require.NoError(b, sh.Close())
|
require.NoError(b, sh.Close(context.Background()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestShardReload(t *testing.T) {
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
objects := make([]objAddr, 5)
|
objects := make([]objAddr, 5)
|
||||||
|
|
|
@ -52,10 +52,10 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
require.NoError(t, errG.Wait())
|
require.NoError(t, errG.Wait())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close(context.Background()))
|
||||||
|
|
||||||
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
|
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close(context.Background())) }()
|
||||||
|
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
|
|
||||||
|
|
|
@ -117,8 +117,8 @@ func runFlushTest[Option any](
|
||||||
defer func() { require.NoError(t, wc.Close()) }()
|
defer func() { require.NoError(t, wc.Close()) }()
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||||
|
|
||||||
require.NoError(t, wc.Flush(context.Background(), false, false))
|
require.NoError(t, wc.Flush(context.Background(), false, false))
|
||||||
|
|
||||||
|
@ -131,11 +131,11 @@ func runFlushTest[Option any](
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||||
require.Error(t, wc.SetMode(mode.Degraded))
|
require.Error(t, wc.SetMode(context.Background(), mode.Degraded))
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
require.NoError(t, wc.SetMode(context.Background(), mode.Degraded))
|
||||||
|
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
})
|
})
|
||||||
|
@ -149,8 +149,8 @@ func runFlushTest[Option any](
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f.InjectFn(t, wc)
|
f.InjectFn(t, wc)
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadWrite))
|
||||||
|
|
||||||
require.Equal(t, uint32(0), errCount.Load())
|
require.Equal(t, uint32(0), errCount.Load())
|
||||||
require.Error(t, wc.Flush(context.Background(), false, false))
|
require.Error(t, wc.Flush(context.Background(), false, false))
|
||||||
|
@ -191,8 +191,8 @@ func newCache[Option any](
|
||||||
require.NoError(t, wc.Init())
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
// First set mode for metabase and blobstor to prevent background flushes.
|
// First set mode for metabase and blobstor to prevent background flushes.
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
require.NoError(t, mb.SetMode(context.Background(), mode.ReadOnly))
|
||||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
require.NoError(t, bs.SetMode(context.Background(), mode.ReadOnly))
|
||||||
|
|
||||||
return wc, bs, mb
|
return wc, bs, mb
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,8 @@ type setModePrm struct {
|
||||||
// SetMode sets write-cache mode of operation.
|
// SetMode sets write-cache mode of operation.
|
||||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||||
// and all background jobs are suspended.
|
// and all background jobs are suspended.
|
||||||
func (c *cache) SetMode(m mode.Mode) error {
|
func (c *cache) SetMode(ctx context.Context, m mode.Mode) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.SetMode",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("mode", m.String()),
|
attribute.String("mode", m.String()),
|
||||||
))
|
))
|
||||||
|
|
|
@ -38,7 +38,7 @@ type Cache interface {
|
||||||
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
||||||
Delete(context.Context, oid.Address) error
|
Delete(context.Context, oid.Address) error
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
||||||
SetMode(mode.Mode) error
|
SetMode(context.Context, mode.Mode) error
|
||||||
SetLogger(*logger.Logger)
|
SetLogger(*logger.Logger)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
Flush(context.Context, bool, bool) error
|
Flush(context.Context, bool, bool) error
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
func (s *Server) DetachShards(ctx context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
||||||
err := s.isValidRequest(req)
|
err := s.isValidRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||||
|
@ -19,7 +19,7 @@ func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsReques
|
||||||
|
|
||||||
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
|
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
|
||||||
|
|
||||||
if err := s.s.DetachShards(shardIDs); err != nil {
|
if err := s.s.DetachShards(ctx, shardIDs); err != nil {
|
||||||
if errors.As(err, new(logicerr.Logical)) {
|
if errors.As(err, new(logicerr.Logical)) {
|
||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
|
func (s *Server) SetShardMode(ctx context.Context, req *control.SetShardModeRequest) (*control.SetShardModeResponse, error) {
|
||||||
// verify request
|
// verify request
|
||||||
err := s.isValidRequest(req)
|
err := s.isValidRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -38,7 +38,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
||||||
err = s.s.SetShardMode(shardID, m, req.GetBody().GetResetErrorCounter())
|
err = s.s.SetShardMode(ctx, shardID, m, req.GetBody().GetResetErrorCounter())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue