forked from TrueCloudLab/frostfs-node
[#1559] local_object_storage: Move shard.Mode
to a separate package
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
d8ba954aff
commit
339864b720
26 changed files with 134 additions and 133 deletions
|
@ -10,7 +10,7 @@ import (
|
|||
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
|
||||
piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama"
|
||||
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -34,7 +34,7 @@ func TestEngineSection(t *testing.T) {
|
|||
|
||||
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
||||
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||
require.EqualValues(t, shard.ModeReadWrite, shardconfig.From(empty).Mode())
|
||||
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
@ -95,7 +95,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
|
||||
|
||||
require.Equal(t, false, sc.RefillMetabase())
|
||||
require.Equal(t, shard.ModeReadOnly, sc.Mode())
|
||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||
case 1:
|
||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||
require.Equal(t, fs.FileMode(0644), pl.Perm())
|
||||
|
@ -133,7 +133,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
|
||||
|
||||
require.Equal(t, true, sc.RefillMetabase())
|
||||
require.Equal(t, shard.ModeReadWrite, sc.Mode())
|
||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
metabaseconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/metabase"
|
||||
piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama"
|
||||
writecacheconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/writecache"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
)
|
||||
|
||||
// Config is a wrapper over the config section
|
||||
|
@ -75,7 +75,7 @@ func (x *Config) RefillMetabase() bool {
|
|||
//
|
||||
// Panics if read the value is not one of predefined
|
||||
// shard modes.
|
||||
func (x *Config) Mode() (m shard.Mode) {
|
||||
func (x *Config) Mode() (m mode.Mode) {
|
||||
s := config.StringSafe(
|
||||
(*config.Config)(x),
|
||||
"mode",
|
||||
|
@ -83,11 +83,11 @@ func (x *Config) Mode() (m shard.Mode) {
|
|||
|
||||
switch s {
|
||||
case "read-write", "":
|
||||
m = shard.ModeReadWrite
|
||||
m = mode.ReadWrite
|
||||
case "read-only":
|
||||
m = shard.ModeReadOnly
|
||||
m = mode.ReadOnly
|
||||
case "degraded":
|
||||
m = shard.ModeDegraded
|
||||
m = mode.Degraded
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown shard mode: %s", s))
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -58,7 +58,7 @@ func TestPersistentShardID(t *testing.T) {
|
|||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/atomic"
|
||||
|
@ -50,7 +51,7 @@ func (e *StorageEngine) reportShardError(
|
|||
return
|
||||
}
|
||||
|
||||
err = sh.SetMode(shard.ModeDegraded)
|
||||
err = sh.SetMode(mode.Degraded)
|
||||
if err != nil {
|
||||
e.log.Error("failed to move shard in degraded mode",
|
||||
zap.Uint32("error count", errCount),
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -79,16 +80,16 @@ func TestErrorReporting(t *testing.T) {
|
|||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
}
|
||||
})
|
||||
t.Run("with error threshold", func(t *testing.T) {
|
||||
|
@ -109,30 +110,30 @@ func TestErrorReporting(t *testing.T) {
|
|||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], errThreshold+i, mode.Degraded)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, false))
|
||||
checkShardState(t, e, id[0], errThreshold+1, shard.ModeReadWrite)
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false))
|
||||
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite)
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], shard.ModeReadWrite, true))
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true))
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -166,7 +167,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.RootPath
|
||||
|
@ -192,11 +193,11 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 4, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[0], 4, mode.Degraded)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) {
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
||||
e.mtx.RLock()
|
||||
sh := e.shards[id.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/nspcc-dev/hrw"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/atomic"
|
||||
|
@ -129,7 +130,7 @@ func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (sto
|
|||
// SetShardMode sets mode of the shard with provided identifier.
|
||||
//
|
||||
// Returns an error if shard mode was not set, or shard was not found in storage engine.
|
||||
func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode, resetErrorCounter bool) error {
|
||||
func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
|
@ -27,7 +28,7 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
|
|||
// Delete removes data from the shard's writeCache, metaBase and
|
||||
// blobStor.
|
||||
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return DeleteRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"os"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
)
|
||||
|
||||
|
@ -55,7 +56,7 @@ func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
|||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode != ModeReadOnly {
|
||||
if s.info.Mode != mode.ReadOnly {
|
||||
return DumpRes{}, ErrMustBeReadOnly
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -62,7 +63,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
require.ErrorIs(t, err, shard.ErrMustBeReadOnly)
|
||||
})
|
||||
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
||||
outEmpty := out + ".empty"
|
||||
var dumpPrm shard.DumpPrm
|
||||
dumpPrm.WithPath(outEmpty)
|
||||
|
@ -70,7 +71,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
res, err := sh.Dump(dumpPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
|
||||
require.NoError(t, sh.SetMode(mode.ReadWrite))
|
||||
|
||||
// Approximate object header size.
|
||||
const headerSize = 400
|
||||
|
@ -100,7 +101,7 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
||||
|
||||
t.Run("invalid path", func(t *testing.T) {
|
||||
var dumpPrm shard.DumpPrm
|
||||
|
@ -198,13 +199,13 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
var prm shard.RestorePrm
|
||||
prm.WithPath(out)
|
||||
t.Run("must allow write", func(t *testing.T) {
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
||||
|
||||
_, err := sh.Restore(prm)
|
||||
require.ErrorIs(t, err, shard.ErrReadOnlyMode)
|
||||
})
|
||||
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
|
||||
require.NoError(t, sh.SetMode(mode.ReadWrite))
|
||||
|
||||
checkRestore(t, sh, prm, objects)
|
||||
})
|
||||
|
@ -230,7 +231,7 @@ func TestStream(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, sh1.SetMode(shard.ModeReadOnly))
|
||||
require.NoError(t, sh1.SetMode(mode.ReadOnly))
|
||||
|
||||
r, w := io.Pipe()
|
||||
finish := make(chan struct{})
|
||||
|
@ -352,7 +353,7 @@ func TestDumpIgnoreErrors(t *testing.T) {
|
|||
|
||||
bsOpts = append(bsOpts, blobstor.WithBlobovniczaShallowWidth(3))
|
||||
sh = newCustomShard(t, dir, true, wcOpts, bsOpts)
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
require.NoError(t, sh.SetMode(mode.ReadOnly))
|
||||
|
||||
{
|
||||
// 2. Invalid object in blobovnicza.
|
||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
|||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -42,7 +43,7 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
|||
if err != nil {
|
||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
||||
// Otherwise, just return an error.
|
||||
if s.GetMode() == ModeDegraded {
|
||||
if s.GetMode() == mode.Degraded {
|
||||
var p blobstor.ExistsPrm
|
||||
p.SetAddress(prm.addr)
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -179,7 +180,7 @@ func (gc *gc) stop() {
|
|||
// with GC-marked graves.
|
||||
// Does nothing if shard is in "read-only" mode.
|
||||
func (s *Shard) removeGarbage() {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -121,7 +122,7 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, big, small stor
|
|||
mPrm.SetAddress(addr)
|
||||
|
||||
mRes, err := s.metaBase.Exists(mPrm)
|
||||
if err != nil && s.GetMode() != ModeDegraded {
|
||||
if err != nil && s.GetMode() != mode.Degraded {
|
||||
return res, false, err
|
||||
}
|
||||
exists = mRes.Exists()
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
)
|
||||
|
||||
|
@ -13,7 +14,7 @@ type Info struct {
|
|||
ID *ID
|
||||
|
||||
// Shard mode.
|
||||
Mode Mode
|
||||
Mode mode.Mode
|
||||
|
||||
// Information about the metabase.
|
||||
MetaBaseInfo meta.Info
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -61,7 +62,7 @@ var ErrLockObjectRemoval = meta.ErrLockObjectRemoval
|
|||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return InhumeRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -14,7 +15,7 @@ import (
|
|||
//
|
||||
// Locked list should be unique. Panics if it is empty.
|
||||
func (s *Shard) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -3,62 +3,23 @@ package shard
|
|||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
)
|
||||
|
||||
// Mode represents enumeration of Shard work modes.
|
||||
type Mode uint32
|
||||
|
||||
// ErrReadOnlyMode is returned when it is impossible to apply operation
|
||||
// that changes shard's memory due to the "read-only" shard's mode.
|
||||
var ErrReadOnlyMode = errors.New("shard is in read-only mode")
|
||||
|
||||
const (
|
||||
// ModeReadWrite is a Mode value for shard that is available
|
||||
// for read and write operations. Default shard mode.
|
||||
ModeReadWrite Mode = iota
|
||||
|
||||
// ModeReadOnly is a Mode value for shard that does not
|
||||
// accept write operation but is readable.
|
||||
ModeReadOnly
|
||||
|
||||
// ModeDegraded is a Mode value for shard that is set automatically
|
||||
// after a certain number of errors is encountered. It is the same as
|
||||
// `ModeReadOnly` but also enables fallback algorithms for getting object
|
||||
// in case metabase is corrupted.
|
||||
ModeDegraded
|
||||
)
|
||||
|
||||
func (m Mode) String() string {
|
||||
switch m {
|
||||
default:
|
||||
return "UNDEFINED"
|
||||
case ModeReadWrite:
|
||||
return "READ_WRITE"
|
||||
case ModeReadOnly:
|
||||
return "READ_ONLY"
|
||||
case ModeDegraded:
|
||||
return "DEGRADED"
|
||||
}
|
||||
}
|
||||
|
||||
// SetMode sets mode of the shard.
|
||||
//
|
||||
// Returns any error encountered that did not allow
|
||||
// setting shard mode.
|
||||
func (s *Shard) SetMode(m Mode) error {
|
||||
func (s *Shard) SetMode(m mode.Mode) error {
|
||||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
|
||||
if s.hasWriteCache() {
|
||||
switch m {
|
||||
case ModeReadOnly:
|
||||
s.writeCache.SetMode(writecache.ModeReadOnly)
|
||||
case ModeDegraded:
|
||||
s.writeCache.SetMode(writecache.ModeDegraded)
|
||||
case ModeReadWrite:
|
||||
s.writeCache.SetMode(writecache.ModeReadWrite)
|
||||
}
|
||||
s.writeCache.SetMode(m)
|
||||
}
|
||||
|
||||
s.info.Mode = m
|
||||
|
@ -67,7 +28,7 @@ func (s *Shard) SetMode(m Mode) error {
|
|||
}
|
||||
|
||||
// GetMode returns mode of the shard.
|
||||
func (s *Shard) GetMode() Mode {
|
||||
func (s *Shard) GetMode() mode.Mode {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
|
33
pkg/local_object_storage/shard/mode/mode.go
Normal file
33
pkg/local_object_storage/shard/mode/mode.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package mode
|
||||
|
||||
// Mode represents enumeration of Shard work modes.
|
||||
type Mode uint32
|
||||
|
||||
const (
|
||||
// ReadWrite is a Mode value for shard that is available
|
||||
// for read and write operations. Default shard mode.
|
||||
ReadWrite Mode = iota
|
||||
|
||||
// ReadOnly is a Mode value for shard that does not
|
||||
// accept write operation but is readable.
|
||||
ReadOnly
|
||||
|
||||
// Degraded is a Mode value for shard that is set automatically
|
||||
// after a certain number of errors is encountered. It is the same as
|
||||
// `mode.ReadOnly` but also enables fallback algorithms for getting object
|
||||
// in case metabase is corrupted.
|
||||
Degraded
|
||||
)
|
||||
|
||||
func (m Mode) String() string {
|
||||
switch m {
|
||||
default:
|
||||
return "UNDEFINED"
|
||||
case ReadWrite:
|
||||
return "READ_WRITE"
|
||||
case ReadOnly:
|
||||
return "READ_ONLY"
|
||||
case Degraded:
|
||||
return "DEGRADED"
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -23,7 +24,7 @@ func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
|||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||
// another shard.
|
||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return ToMoveItRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -29,7 +30,7 @@ func (p *PutPrm) SetObject(obj *object.Object) {
|
|||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return PutRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
)
|
||||
|
||||
|
@ -61,7 +62,7 @@ func (s *Shard) Restore(prm RestorePrm) (RestoreRes, error) {
|
|||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode != ModeReadWrite {
|
||||
if s.info.Mode != mode.ReadWrite {
|
||||
return RestoreRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -235,9 +236,9 @@ func WithRefillMetabase(v bool) Option {
|
|||
}
|
||||
|
||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||
// - ModeReadWrite;
|
||||
// - ModeReadOnly.
|
||||
func WithMode(v Mode) Option {
|
||||
// - mode.ReadWrite;
|
||||
// - mode.ReadOnly.
|
||||
func WithMode(v mode.Mode) Option {
|
||||
return func(c *cfg) {
|
||||
c.info.Mode = v
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
|
@ -17,7 +18,7 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
|
|||
if s.pilorama == nil {
|
||||
return nil, ErrPiloramaDisabled
|
||||
}
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
return s.pilorama.TreeMove(d, treeID, m)
|
||||
|
@ -28,7 +29,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
|
|||
if s.pilorama == nil {
|
||||
return nil, ErrPiloramaDisabled
|
||||
}
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
return s.pilorama.TreeAddByPath(d, treeID, attr, path, meta)
|
||||
|
@ -39,7 +40,7 @@ func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.M
|
|||
if s.pilorama == nil {
|
||||
return ErrPiloramaDisabled
|
||||
}
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
if s.GetMode() != mode.ReadWrite {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
return s.pilorama.TreeApply(d, treeID, m)
|
||||
|
|
|
@ -3,20 +3,8 @@ package writecache
|
|||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Mode represents write-cache mode of operation.
|
||||
type Mode uint32
|
||||
|
||||
const (
|
||||
// ModeReadWrite is a default mode allowing objects to be flushed.
|
||||
ModeReadWrite Mode = iota
|
||||
|
||||
// ModeReadOnly is a mode in which write-cache doesn't flush anything to a metabase.
|
||||
ModeReadOnly
|
||||
|
||||
// ModeDegraded is similar to a shard's degraded mode.
|
||||
ModeDegraded
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
)
|
||||
|
||||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||
|
@ -25,7 +13,7 @@ var ErrReadOnly = errors.New("write-cache is in read-only mode")
|
|||
// SetMode sets write-cache mode of operation.
|
||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||
// and all background jobs are suspended.
|
||||
func (c *cache) SetMode(m Mode) {
|
||||
func (c *cache) SetMode(m mode.Mode) {
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
if c.mode == m {
|
||||
|
@ -33,7 +21,7 @@ func (c *cache) SetMode(m Mode) {
|
|||
}
|
||||
|
||||
c.mode = m
|
||||
if m == ModeReadWrite {
|
||||
if m == mode.ReadWrite {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -57,5 +45,5 @@ func (c *cache) SetMode(m Mode) {
|
|||
// readOnly returns true if current mode is read-only.
|
||||
// `c.modeMtx` must be taken.
|
||||
func (c *cache) readOnly() bool {
|
||||
return c.mode != ModeReadWrite
|
||||
return c.mode != mode.ReadWrite
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -23,7 +24,7 @@ type Cache interface {
|
|||
Delete(oid.Address) error
|
||||
Iterate(IterationPrm) error
|
||||
Put(*object.Object) error
|
||||
SetMode(Mode)
|
||||
SetMode(mode.Mode)
|
||||
SetLogger(*zap.Logger)
|
||||
DumpInfo() Info
|
||||
|
||||
|
@ -39,7 +40,7 @@ type cache struct {
|
|||
mtx sync.RWMutex
|
||||
mem []objectInfo
|
||||
|
||||
mode Mode
|
||||
mode mode.Mode
|
||||
modeMtx sync.RWMutex
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
|
@ -90,7 +91,7 @@ func New(opts ...Option) Cache {
|
|||
metaCh: make(chan *object.Object),
|
||||
closeCh: make(chan struct{}),
|
||||
evictCh: make(chan []byte),
|
||||
mode: ModeReadWrite,
|
||||
mode: mode.ReadWrite,
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
|
@ -152,7 +153,7 @@ func (c *cache) Init() error {
|
|||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||
func (c *cache) Close() error {
|
||||
// Finish all in-progress operations.
|
||||
c.SetMode(ModeReadOnly)
|
||||
c.SetMode(mode.ReadOnly)
|
||||
|
||||
close(c.closeCh)
|
||||
c.objCounters.FlushAndClose()
|
||||
|
|
|
@ -3,7 +3,7 @@ package control
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -34,20 +34,20 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) (
|
|||
si.SetWriteCachePath(sh.WriteCacheInfo.Path)
|
||||
si.SetPiloramaPath(sh.PiloramaInfo.Path)
|
||||
|
||||
var mode control.ShardMode
|
||||
var m control.ShardMode
|
||||
|
||||
switch sh.Mode {
|
||||
case shard.ModeReadWrite:
|
||||
mode = control.ShardMode_READ_WRITE
|
||||
case shard.ModeReadOnly:
|
||||
mode = control.ShardMode_READ_ONLY
|
||||
case shard.ModeDegraded:
|
||||
mode = control.ShardMode_DEGRADED
|
||||
case mode.ReadWrite:
|
||||
m = control.ShardMode_READ_WRITE
|
||||
case mode.ReadOnly:
|
||||
m = control.ShardMode_READ_ONLY
|
||||
case mode.Degraded:
|
||||
m = control.ShardMode_DEGRADED
|
||||
default:
|
||||
mode = control.ShardMode_SHARD_MODE_UNDEFINED
|
||||
m = control.ShardMode_SHARD_MODE_UNDEFINED
|
||||
}
|
||||
|
||||
si.SetMode(mode)
|
||||
si.SetMode(m)
|
||||
si.SetErrorCount(sh.ErrorCount)
|
||||
|
||||
shardInfos = append(shardInfos, si)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -18,7 +19,7 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
|
|||
}
|
||||
|
||||
var (
|
||||
mode shard.Mode
|
||||
m mode.Mode
|
||||
|
||||
requestedMode = req.GetBody().GetMode()
|
||||
requestedShard = shard.NewIDFromBytes(req.Body.GetShard_ID())
|
||||
|
@ -26,16 +27,16 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques
|
|||
|
||||
switch requestedMode {
|
||||
case control.ShardMode_READ_WRITE:
|
||||
mode = shard.ModeReadWrite
|
||||
m = mode.ReadWrite
|
||||
case control.ShardMode_READ_ONLY:
|
||||
mode = shard.ModeReadOnly
|
||||
m = mode.ReadOnly
|
||||
case control.ShardMode_DEGRADED:
|
||||
mode = shard.ModeDegraded
|
||||
m = mode.Degraded
|
||||
default:
|
||||
return nil, status.Error(codes.Internal, fmt.Sprintf("unknown shard mode: %s", requestedMode))
|
||||
}
|
||||
|
||||
err = s.s.SetShardMode(requestedShard, mode, false)
|
||||
err = s.s.SetShardMode(requestedShard, m, false)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue