[#1840] neofs-node: Use blobstor paths to identify shard
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
4b005d3178
commit
2d43892fc9
3 changed files with 52 additions and 25 deletions
|
@ -8,6 +8,8 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
atomicstd "sync/atomic"
|
atomicstd "sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -135,6 +137,18 @@ type shardCfg struct {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// id returns persistent id of a shard. It is different from the ID used in runtime
|
||||||
|
// and is primarily used to identify shards in the configuration.
|
||||||
|
func (c *shardCfg) id() string {
|
||||||
|
// This calculation should be kept in sync with
|
||||||
|
// pkg/local_object_storage/engine/control.go file.
|
||||||
|
var sb strings.Builder
|
||||||
|
for i := range c.subStorages {
|
||||||
|
sb.WriteString(filepath.Clean(c.subStorages[i].path))
|
||||||
|
}
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
type subStorageCfg struct {
|
type subStorageCfg struct {
|
||||||
// common for all storages
|
// common for all storages
|
||||||
typ string
|
typ string
|
||||||
|
@ -594,13 +608,13 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
type shardOptsWithMetaPath struct {
|
type shardOptsWithID struct {
|
||||||
metaPath string
|
configID string
|
||||||
shOpts []shard.Option
|
shOpts []shard.Option
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) shardOpts() []shardOptsWithMetaPath {
|
func (c *cfg) shardOpts() []shardOptsWithID {
|
||||||
shards := make([]shardOptsWithMetaPath, 0, len(c.EngineCfg.shards))
|
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
|
||||||
|
|
||||||
for _, shCfg := range c.EngineCfg.shards {
|
for _, shCfg := range c.EngineCfg.shards {
|
||||||
var writeCacheOpts []writecache.Option
|
var writeCacheOpts []writecache.Option
|
||||||
|
@ -663,8 +677,8 @@ func (c *cfg) shardOpts() []shardOptsWithMetaPath {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var sh shardOptsWithMetaPath
|
var sh shardOptsWithID
|
||||||
sh.metaPath = shCfg.metaCfg.path
|
sh.configID = shCfg.id()
|
||||||
sh.shOpts = []shard.Option{
|
sh.shOpts = []shard.Option{
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
|
@ -831,8 +845,8 @@ func (c *cfg) configWatcher(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var rcfg engine.ReConfiguration
|
var rcfg engine.ReConfiguration
|
||||||
for _, optsWithMeta := range c.shardOpts() {
|
for _, optsWithID := range c.shardOpts() {
|
||||||
rcfg.AddShard(optsWithMeta.metaPath, optsWithMeta.shOpts)
|
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(rcfg)
|
||||||
|
|
|
@ -3,6 +3,8 @@ package engine
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -217,18 +219,18 @@ func (rCfg *ReConfiguration) SetShardPoolSize(shardPoolSize uint32) {
|
||||||
rCfg.shardPoolSize = shardPoolSize
|
rCfg.shardPoolSize = shardPoolSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddShard adds a shard for the reconfiguration. Path to a metabase is used as
|
// AddShard adds a shard for the reconfiguration.
|
||||||
// an identifier of the shard in configuration.
|
// Shard identifier is calculated from paths used in blobstor.
|
||||||
func (rCfg *ReConfiguration) AddShard(metaPath string, opts []shard.Option) {
|
func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
|
||||||
if rCfg.shards == nil {
|
if rCfg.shards == nil {
|
||||||
rCfg.shards = make(map[string][]shard.Option)
|
rCfg.shards = make(map[string][]shard.Option)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, found := rCfg.shards[metaPath]; found {
|
if _, found := rCfg.shards[id]; found {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rCfg.shards[metaPath] = opts
|
rCfg.shards[id] = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reload reloads StorageEngine's configuration in runtime.
|
// Reload reloads StorageEngine's configuration in runtime.
|
||||||
|
@ -240,24 +242,26 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
|
||||||
|
|
||||||
// mark removed shards for removal
|
// mark removed shards for removal
|
||||||
for id, sh := range e.shards {
|
for id, sh := range e.shards {
|
||||||
_, ok := rcfg.shards[sh.Shard.DumpInfo().MetaBaseInfo.Path]
|
_, ok := rcfg.shards[calculateShardID(sh.DumpInfo())]
|
||||||
if !ok {
|
if !ok {
|
||||||
shardsToRemove = append(shardsToRemove, id)
|
shardsToRemove = append(shardsToRemove, id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark new shards for addition
|
// mark new shards for addition
|
||||||
for newPath := range rcfg.shards {
|
for newID := range rcfg.shards {
|
||||||
addShard := true
|
addShard := true
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
if newPath == sh.Shard.DumpInfo().MetaBaseInfo.Path {
|
// This calculation should be kept in sync with node
|
||||||
|
// configuration parsing during SIGHUP.
|
||||||
|
if newID == calculateShardID(sh.DumpInfo()) {
|
||||||
addShard = false
|
addShard = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if addShard {
|
if addShard {
|
||||||
shardsToAdd = append(shardsToAdd, newPath)
|
shardsToAdd = append(shardsToAdd, newID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,10 +269,10 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
|
||||||
|
|
||||||
e.removeShards(shardsToRemove...)
|
e.removeShards(shardsToRemove...)
|
||||||
|
|
||||||
for _, newPath := range shardsToAdd {
|
for _, newID := range shardsToAdd {
|
||||||
sh, err := e.createShard(rcfg.shards[newPath])
|
sh, err := e.createShard(rcfg.shards[newID])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newPath, err)
|
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
idStr := sh.ID().String()
|
idStr := sh.ID().String()
|
||||||
|
@ -293,3 +297,13 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func calculateShardID(info shard.Info) string {
|
||||||
|
// This calculation should be kept in sync with node
|
||||||
|
// configuration parsing during SIGHUP.
|
||||||
|
var sb strings.Builder
|
||||||
|
for _, sub := range info.BlobStorInfo.SubStorages {
|
||||||
|
sb.WriteString(filepath.Clean(sub.Path))
|
||||||
|
}
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
|
@ -147,19 +147,18 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
||||||
|
|
||||||
e := New()
|
e := New()
|
||||||
for i := 0; i < num; i++ {
|
for i := 0; i < num; i++ {
|
||||||
metaPath := filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))
|
id, err := e.AddShard(
|
||||||
currShards = append(currShards, metaPath)
|
|
||||||
|
|
||||||
_, err := e.AddShard(
|
|
||||||
shard.WithBlobStorOptions(
|
shard.WithBlobStorOptions(
|
||||||
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))),
|
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))),
|
||||||
shard.WithMetaBaseOptions(
|
shard.WithMetaBaseOptions(
|
||||||
meta.WithPath(metaPath),
|
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))),
|
||||||
meta.WithPermissions(0700),
|
meta.WithPermissions(0700),
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo()))
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, num, len(e.shards))
|
require.Equal(t, num, len(e.shards))
|
||||||
|
|
Loading…
Reference in a new issue