From fbd5bc8c38d9b49e1474b61f58b002c161b1739a Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 27 Sep 2022 00:39:34 +0300 Subject: [PATCH] [#1770] engine: Support configuration reload Currently, it only supports changing the compound of the shards. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 12 +++ pkg/local_object_storage/engine/control.go | 99 +++++++++++++++++++ .../engine/control_test.go | 92 +++++++++++++++++ pkg/local_object_storage/engine/shards.go | 39 +++++++- .../engine/shards_test.go | 46 +++++++++ 5 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 pkg/local_object_storage/engine/shards_test.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index d786cddc4..b705f4e94 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -78,6 +78,10 @@ const notificationHandlerPoolSize = 10 // structs). // It must not be used concurrently. type applicationConfiguration struct { + // _read indicated whether a config + // has already been read + _read bool + EngineCfg struct { errorThreshold uint32 shardPoolSize uint32 @@ -144,6 +148,14 @@ type subStorageCfg struct { // readConfig fills applicationConfiguration with raw configuration values // not modifying them. func (a *applicationConfiguration) readConfig(c *config.Config) error { + if a._read { + // clear if it is rereading + *a = applicationConfiguration{} + } else { + // update the status + a._read = true + } + a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 2d8e4c049..d5228d243 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -198,3 +198,102 @@ func (e *StorageEngine) BlockExecution(err error) error { func (e *StorageEngine) ResumeExecution() error { return e.setBlockExecErr(nil) } + +type ReConfiguration struct { + errorsThreshold uint32 + shardPoolSize uint32 + + shards map[string][]shard.Option // meta path -> shard opts +} + +// SetErrorsThreshold sets a size amount of errors after which +// shard is moved to read-only mode. +func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) { + rCfg.errorsThreshold = errorsThreshold +} + +// SetShardPoolSize sets a size of worker pool for each shard +func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) { + rCfg.shardPoolSize = shardPoolSize +} + +// AddShard adds a shard for the reconfiguration. Path to a metabase is used as +// an identifier of the shard in configuration. +func (rCfg *ReConfiguration) AddShard(metaPath string, opts []shard.Option) { + if rCfg.shards == nil { + rCfg.shards = make(map[string][]shard.Option) + } + + if _, found := rCfg.shards[metaPath]; found { + return + } + + rCfg.shards[metaPath] = opts +} + +// Reload reloads StorageEngine's configuration in runtime. +func (e *StorageEngine) Reload(rcfg ReConfiguration) error { + e.mtx.RLock() + + var shardsToRemove []string // shards IDs + var shardsToAdd []string // meta paths + + // mark removed shards for removal + for id, sh := range e.shards { + _, ok := rcfg.shards[sh.Shard.DumpInfo().MetaBaseInfo.Path] + if !ok { + shardsToRemove = append(shardsToRemove, id) + } + } + + // mark new shards for addition + for newPath := range rcfg.shards { + addShard := true + for _, sh := range e.shards { + if newPath == sh.Shard.DumpInfo().MetaBaseInfo.Path { + addShard = false + break + } + } + + if addShard { + shardsToAdd = append(shardsToAdd, newPath) + } + } + + e.mtx.RUnlock() + + err := e.removeShards(shardsToRemove...) + if err != nil { + return fmt.Errorf("could not remove shards: %w", err) + } + + e.mtx.Lock() + defer e.mtx.Unlock() + + for _, newPath := range shardsToAdd { + id, err := e.addShard(rcfg.shards[newPath]...) + if err != nil { + return fmt.Errorf("could not add new shard: %w", err) + } + + idStr := id.String() + sh := e.shards[idStr] + + err = sh.Open() + if err == nil { + err = sh.Init() + } + if err != nil { + delete(e.shards, idStr) + e.shardPools[idStr].Release() + delete(e.shardPools, idStr) + + return fmt.Errorf("could not init %s shard: %w", idStr, err) + } + + e.log.Info("added new shard", zap.String("id", idStr)) + } + + return nil +} diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 12c17578d..59750424b 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -2,11 +2,16 @@ package engine import ( "errors" + "fmt" "os" "path/filepath" + "strconv" "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/stretchr/testify/require" @@ -78,3 +83,90 @@ func TestPersistentShardID(t *testing.T) { require.NoError(t, e.Close()) } + +func TestReload(t *testing.T) { + path := t.TempDir() + + t.Run("add shards", func(t *testing.T) { + const shardNum = 4 + addPath := filepath.Join(path, "add") + + e, currShards := engineWithShards(t, addPath, shardNum) + + var rcfg ReConfiguration + for _, p := range currShards { + rcfg.AddShard(p, nil) + } + + rcfg.AddShard(currShards[0], nil) // same path + require.NoError(t, e.Reload(rcfg)) + + // no new paths => no new shards + require.Equal(t, shardNum, len(e.shards)) + require.Equal(t, shardNum, len(e.shardPools)) + + newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) + + // add new shard + rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions( + meta.WithPath(newMeta), + meta.WithEpochState(epochState{}), + )}) + require.NoError(t, e.Reload(rcfg)) + + require.Equal(t, shardNum+1, len(e.shards)) + require.Equal(t, shardNum+1, len(e.shardPools)) + }) + + t.Run("remove shards", func(t *testing.T) { + const shardNum = 4 + removePath := filepath.Join(path, "remove") + + e, currShards := engineWithShards(t, removePath, shardNum) + + var rcfg ReConfiguration + for i := 0; i < len(currShards)-1; i++ { // without one of the shards + rcfg.AddShard(currShards[i], nil) + } + + require.NoError(t, e.Reload(rcfg)) + + // removed one + require.Equal(t, shardNum-1, len(e.shards)) + require.Equal(t, shardNum-1, len(e.shardPools)) + }) +} + +// engineWithShards creates engine with specified number of shards. Returns +// slice of paths to their metabase and the engine. +// TODO: #1776 unify engine construction in tests +func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) { + addPath := filepath.Join(path, "add") + + currShards := make([]string, 0, num) + + e := New() + for i := 0; i < num; i++ { + metaPath := filepath.Join(addPath, fmt.Sprintf("%d.metabase", i)) + currShards = append(currShards, metaPath) + + _, err := e.AddShard( + shard.WithBlobStorOptions( + blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))), + shard.WithMetaBaseOptions( + meta.WithPath(metaPath), + meta.WithPermissions(0700), + meta.WithEpochState(epochState{}), + ), + ) + require.NoError(t, err) + } + + require.Equal(t, num, len(e.shards)) + require.Equal(t, num, len(e.shardPools)) + + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, currShards +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index f89a68633..4c14ba652 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -11,6 +11,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/panjf2000/ants/v2" "go.uber.org/atomic" + "go.uber.org/zap" ) var errShardNotFound = errors.New("shard not found") @@ -46,6 +47,10 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { e.mtx.Lock() defer e.mtx.Unlock() + return e.addShard(opts...) +} + +func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) { pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) if err != nil { return nil, err @@ -73,7 +78,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { )...) if err := sh.UpdateID(); err != nil { - return nil, fmt.Errorf("could not open shard: %w", err) + return nil, fmt.Errorf("could not update shard ID: %w", err) } strID := sh.ID().String() @@ -91,6 +96,38 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return sh.ID(), nil } +// removeShards removes specified shards. Skips non-existent shards. +// Returns any error encountered that did not allow remove the shards. +func (e *StorageEngine) removeShards(ids ...string) error { + e.mtx.Lock() + defer e.mtx.Unlock() + + for _, id := range ids { + sh, found := e.shards[id] + if !found { + continue + } + + err := sh.Close() + if err != nil { + return fmt.Errorf("could not close removed shard: %w", err) + } + + delete(e.shards, id) + + pool, ok := e.shardPools[id] + if ok { + pool.Release() + delete(e.shardPools, id) + } + + e.log.Info("shard has been removed", + zap.String("id", id)) + } + + return nil +} + func generateShardID() (*shard.ID, error) { uid, err := uuid.NewRandom() if err != nil { diff --git a/pkg/local_object_storage/engine/shards_test.go b/pkg/local_object_storage/engine/shards_test.go new file mode 100644 index 000000000..1449ea9de --- /dev/null +++ b/pkg/local_object_storage/engine/shards_test.go @@ -0,0 +1,46 @@ +package engine + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRemoveShard(t *testing.T) { + const numOfShards = 6 + + e := testNewEngineWithShardNum(t, numOfShards) + t.Cleanup(func() { + e.Close() + os.RemoveAll(t.Name()) + }) + + require.Equal(t, numOfShards, len(e.shardPools)) + require.Equal(t, numOfShards, len(e.shards)) + + removedNum := numOfShards / 2 + + mSh := make(map[string]bool, numOfShards) + for i, sh := range e.DumpInfo().Shards { + if i == removedNum { + break + } + + mSh[sh.ID.String()] = true + } + + for id, remove := range mSh { + if remove { + require.NoError(t, e.removeShards(id)) + } + } + + require.Equal(t, numOfShards-removedNum, len(e.shardPools)) + require.Equal(t, numOfShards-removedNum, len(e.shards)) + + for id, removed := range mSh { + _, ok := e.shards[id] + require.True(t, ok != removed) + } +}