[#1770] engine: Support configuration reload

Currently, it only supports changing the compound of the shards.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-09-27 00:39:34 +03:00 committed by fyrchik
parent 91b56ad3e8
commit fbd5bc8c38
5 changed files with 287 additions and 1 deletions

View file

@ -78,6 +78,10 @@ const notificationHandlerPoolSize = 10
// structs). // structs).
// It must not be used concurrently. // It must not be used concurrently.
type applicationConfiguration struct { type applicationConfiguration struct {
// _read indicated whether a config
// has already been read
_read bool
EngineCfg struct { EngineCfg struct {
errorThreshold uint32 errorThreshold uint32
shardPoolSize uint32 shardPoolSize uint32
@ -144,6 +148,14 @@ type subStorageCfg struct {
// readConfig fills applicationConfiguration with raw configuration values // readConfig fills applicationConfiguration with raw configuration values
// not modifying them. // not modifying them.
func (a *applicationConfiguration) readConfig(c *config.Config) error { func (a *applicationConfiguration) readConfig(c *config.Config) error {
if a._read {
// clear if it is rereading
*a = applicationConfiguration{}
} else {
// update the status
a._read = true
}
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)

View file

@ -198,3 +198,102 @@ func (e *StorageEngine) BlockExecution(err error) error {
func (e *StorageEngine) ResumeExecution() error { func (e *StorageEngine) ResumeExecution() error {
return e.setBlockExecErr(nil) return e.setBlockExecErr(nil)
} }
type ReConfiguration struct {
errorsThreshold uint32
shardPoolSize uint32
shards map[string][]shard.Option // meta path -> shard opts
}
// SetErrorsThreshold sets a size amount of errors after which
// shard is moved to read-only mode.
func (rCfg *ReConfiguration) SetErrorsThreshold(errorsThreshold uint32) {
rCfg.errorsThreshold = errorsThreshold
}
// SetShardPoolSize sets a size of worker pool for each shard
func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
rCfg.shardPoolSize = shardPoolSize
}
// AddShard adds a shard for the reconfiguration. Path to a metabase is used as
// an identifier of the shard in configuration.
func (rCfg *ReConfiguration) AddShard(metaPath string, opts []shard.Option) {
if rCfg.shards == nil {
rCfg.shards = make(map[string][]shard.Option)
}
if _, found := rCfg.shards[metaPath]; found {
return
}
rCfg.shards[metaPath] = opts
}
// Reload reloads StorageEngine's configuration in runtime.
func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
e.mtx.RLock()
var shardsToRemove []string // shards IDs
var shardsToAdd []string // meta paths
// mark removed shards for removal
for id, sh := range e.shards {
_, ok := rcfg.shards[sh.Shard.DumpInfo().MetaBaseInfo.Path]
if !ok {
shardsToRemove = append(shardsToRemove, id)
}
}
// mark new shards for addition
for newPath := range rcfg.shards {
addShard := true
for _, sh := range e.shards {
if newPath == sh.Shard.DumpInfo().MetaBaseInfo.Path {
addShard = false
break
}
}
if addShard {
shardsToAdd = append(shardsToAdd, newPath)
}
}
e.mtx.RUnlock()
err := e.removeShards(shardsToRemove...)
if err != nil {
return fmt.Errorf("could not remove shards: %w", err)
}
e.mtx.Lock()
defer e.mtx.Unlock()
for _, newPath := range shardsToAdd {
id, err := e.addShard(rcfg.shards[newPath]...)
if err != nil {
return fmt.Errorf("could not add new shard: %w", err)
}
idStr := id.String()
sh := e.shards[idStr]
err = sh.Open()
if err == nil {
err = sh.Init()
}
if err != nil {
delete(e.shards, idStr)
e.shardPools[idStr].Release()
delete(e.shardPools, idStr)
return fmt.Errorf("could not init %s shard: %w", idStr, err)
}
e.log.Info("added new shard", zap.String("id", idStr))
}
return nil
}

View file

@ -2,11 +2,16 @@ package engine
import ( import (
"errors" "errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"testing" "testing"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -78,3 +83,90 @@ func TestPersistentShardID(t *testing.T) {
require.NoError(t, e.Close()) require.NoError(t, e.Close())
} }
func TestReload(t *testing.T) {
path := t.TempDir()
t.Run("add shards", func(t *testing.T) {
const shardNum = 4
addPath := filepath.Join(path, "add")
e, currShards := engineWithShards(t, addPath, shardNum)
var rcfg ReConfiguration
for _, p := range currShards {
rcfg.AddShard(p, nil)
}
rcfg.AddShard(currShards[0], nil) // same path
require.NoError(t, e.Reload(rcfg))
// no new paths => no new shards
require.Equal(t, shardNum, len(e.shards))
require.Equal(t, shardNum, len(e.shardPools))
newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))
// add new shard
rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions(
meta.WithPath(newMeta),
meta.WithEpochState(epochState{}),
)})
require.NoError(t, e.Reload(rcfg))
require.Equal(t, shardNum+1, len(e.shards))
require.Equal(t, shardNum+1, len(e.shardPools))
})
t.Run("remove shards", func(t *testing.T) {
const shardNum = 4
removePath := filepath.Join(path, "remove")
e, currShards := engineWithShards(t, removePath, shardNum)
var rcfg ReConfiguration
for i := 0; i < len(currShards)-1; i++ { // without one of the shards
rcfg.AddShard(currShards[i], nil)
}
require.NoError(t, e.Reload(rcfg))
// removed one
require.Equal(t, shardNum-1, len(e.shards))
require.Equal(t, shardNum-1, len(e.shardPools))
})
}
// engineWithShards creates engine with specified number of shards. Returns
// slice of paths to their metabase and the engine.
// TODO: #1776 unify engine construction in tests
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
addPath := filepath.Join(path, "add")
currShards := make([]string, 0, num)
e := New()
for i := 0; i < num; i++ {
metaPath := filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))
currShards = append(currShards, metaPath)
_, err := e.AddShard(
shard.WithBlobStorOptions(
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))),
shard.WithMetaBaseOptions(
meta.WithPath(metaPath),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
),
)
require.NoError(t, err)
}
require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))
require.NoError(t, e.Open())
require.NoError(t, e.Init())
return e, currShards
}

View file

@ -11,6 +11,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap"
) )
var errShardNotFound = errors.New("shard not found") var errShardNotFound = errors.New("shard not found")
@ -46,6 +47,10 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.mtx.Lock() e.mtx.Lock()
defer e.mtx.Unlock() defer e.mtx.Unlock()
return e.addShard(opts...)
}
func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil { if err != nil {
return nil, err return nil, err
@ -73,7 +78,7 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
)...) )...)
if err := sh.UpdateID(); err != nil { if err := sh.UpdateID(); err != nil {
return nil, fmt.Errorf("could not open shard: %w", err) return nil, fmt.Errorf("could not update shard ID: %w", err)
} }
strID := sh.ID().String() strID := sh.ID().String()
@ -91,6 +96,38 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
return sh.ID(), nil return sh.ID(), nil
} }
// removeShards removes specified shards. Skips non-existent shards.
// Returns any error encountered that did not allow remove the shards.
func (e *StorageEngine) removeShards(ids ...string) error {
e.mtx.Lock()
defer e.mtx.Unlock()
for _, id := range ids {
sh, found := e.shards[id]
if !found {
continue
}
err := sh.Close()
if err != nil {
return fmt.Errorf("could not close removed shard: %w", err)
}
delete(e.shards, id)
pool, ok := e.shardPools[id]
if ok {
pool.Release()
delete(e.shardPools, id)
}
e.log.Info("shard has been removed",
zap.String("id", id))
}
return nil
}
func generateShardID() (*shard.ID, error) { func generateShardID() (*shard.ID, error) {
uid, err := uuid.NewRandom() uid, err := uuid.NewRandom()
if err != nil { if err != nil {

View file

@ -0,0 +1,46 @@
package engine
import (
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestRemoveShard(t *testing.T) {
const numOfShards = 6
e := testNewEngineWithShardNum(t, numOfShards)
t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name())
})
require.Equal(t, numOfShards, len(e.shardPools))
require.Equal(t, numOfShards, len(e.shards))
removedNum := numOfShards / 2
mSh := make(map[string]bool, numOfShards)
for i, sh := range e.DumpInfo().Shards {
if i == removedNum {
break
}
mSh[sh.ID.String()] = true
}
for id, remove := range mSh {
if remove {
require.NoError(t, e.removeShards(id))
}
}
require.Equal(t, numOfShards-removedNum, len(e.shardPools))
require.Equal(t, numOfShards-removedNum, len(e.shards))
for id, removed := range mSh {
_, ok := e.shards[id]
require.True(t, ok != removed)
}
}