[#1869] shard: Allow to reload metabase on SIGHUP
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
f769fc83fc
commit
c785e11b20
7 changed files with 252 additions and 13 deletions
|
@ -12,6 +12,8 @@ Changelog for NeoFS Node
|
|||
- `--force` flag to `neofs-cli control set-status` command (#1916)
|
||||
|
||||
### Changed
|
||||
- Path to a metabase can now be reloaded with a SIGHUP.
|
||||
|
||||
### Fixed
|
||||
- `writecache.max_object_size` is now correctly handled (#1925)
|
||||
- Correctly handle setting ONLINE netmap status after maintenance (#1922)
|
||||
|
|
|
@ -235,10 +235,16 @@ func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) {
|
|||
|
||||
// Reload reloads StorageEngine's configuration in runtime.
|
||||
func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
|
||||
type reloadInfo struct {
|
||||
sh *shard.Shard
|
||||
opts []shard.Option
|
||||
}
|
||||
|
||||
e.mtx.RLock()
|
||||
|
||||
var shardsToRemove []string // shards IDs
|
||||
var shardsToAdd []string // meta paths
|
||||
var shardsToAdd []string // shard config identifiers (blobstor paths concatenation)
|
||||
var shardsToReload []reloadInfo
|
||||
|
||||
// mark removed shards for removal
|
||||
for id, sh := range e.shards {
|
||||
|
@ -248,27 +254,36 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
|
|||
}
|
||||
}
|
||||
|
||||
// mark new shards for addition
|
||||
loop:
|
||||
for newID := range rcfg.shards {
|
||||
addShard := true
|
||||
for _, sh := range e.shards {
|
||||
// This calculation should be kept in sync with node
|
||||
// configuration parsing during SIGHUP.
|
||||
if newID == calculateShardID(sh.DumpInfo()) {
|
||||
addShard = false
|
||||
break
|
||||
shardsToReload = append(shardsToReload, reloadInfo{
|
||||
sh: sh.Shard,
|
||||
opts: rcfg.shards[newID],
|
||||
})
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
|
||||
if addShard {
|
||||
shardsToAdd = append(shardsToAdd, newID)
|
||||
}
|
||||
}
|
||||
|
||||
e.mtx.RUnlock()
|
||||
|
||||
e.removeShards(shardsToRemove...)
|
||||
|
||||
for _, p := range shardsToReload {
|
||||
err := p.sh.Reload(p.opts...)
|
||||
if err != nil {
|
||||
e.log.Error("could not reload a shard",
|
||||
zap.Stringer("shard id", p.sh.ID()),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
for _, newID := range shardsToAdd {
|
||||
sh, err := e.createShard(rcfg.shards[newID])
|
||||
if err != nil {
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ErrDegradedMode is returned when metabase is in a degraded mode.
|
||||
var ErrDegradedMode = errors.New("metabase is in a degraded mode")
|
||||
|
||||
// Open boltDB instance for metabase.
|
||||
func (db *DB) Open(readOnly bool) error {
|
||||
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
|
||||
|
@ -24,6 +29,12 @@ func (db *DB) Open(readOnly bool) error {
|
|||
}
|
||||
db.boltOptions.ReadOnly = readOnly
|
||||
|
||||
return db.openBolt()
|
||||
}
|
||||
|
||||
func (db *DB) openBolt() error {
|
||||
var err error
|
||||
|
||||
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't open boltDB database: %w", err)
|
||||
|
@ -144,3 +155,36 @@ func (db *DB) Close() error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload reloads part of the configuration.
|
||||
// It returns true iff database was reopened.
|
||||
// If a config option is invalid, it logs an error and returns nil.
|
||||
// If there was a problem with applying new configuration, an error is returned.
|
||||
//
|
||||
// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned.
|
||||
func (db *DB) Reload(opts ...Option) (bool, error) {
|
||||
var c cfg
|
||||
for i := range opts {
|
||||
opts[i](&c)
|
||||
}
|
||||
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
if c.info.Path != "" && filepath.Clean(db.info.Path) != filepath.Clean(c.info.Path) {
|
||||
if err := db.Close(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
db.mode = mode.Degraded
|
||||
db.info.Path = c.info.Path
|
||||
if err := db.openBolt(); err != nil {
|
||||
return false, fmt.Errorf("%w: %v", ErrDegradedMode, err)
|
||||
}
|
||||
|
||||
db.mode = mode.ReadWrite
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -265,3 +265,34 @@ func (s *Shard) Close() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload reloads configuration portions that are necessary.
|
||||
// If a config option is invalid, it logs an error and returns nil.
|
||||
// If there was a problem with applying new configuration, an error is returned.
|
||||
func (s *Shard) Reload(opts ...Option) error {
|
||||
// Do not use defaultCfg here missing options need not be reloaded.
|
||||
var c cfg
|
||||
for i := range opts {
|
||||
opts[i](&c)
|
||||
}
|
||||
|
||||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
|
||||
ok, err := s.metaBase.Reload(c.metaOpts...)
|
||||
if err != nil {
|
||||
if errors.Is(err, meta.ErrDegradedMode) {
|
||||
_ = s.setMode(mode.DegradedReadOnly)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
if c.refillMetabase {
|
||||
return s.refillMetabase()
|
||||
} else {
|
||||
return s.metaBase.Init()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -31,6 +31,11 @@ func (s epochState) CurrentEpoch() uint64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
type objAddr struct {
|
||||
obj *objectSDK.Object
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
func TestShardOpen(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
metaPath := filepath.Join(dir, "meta")
|
||||
|
@ -164,11 +169,6 @@ func TestRefillMetabase(t *testing.T) {
|
|||
|
||||
const objNum = 5
|
||||
|
||||
type objAddr struct {
|
||||
obj *objectSDK.Object
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
mObjs := make(map[string]objAddr)
|
||||
locked := make([]oid.ID, 1, 2)
|
||||
locked[0] = oidtest.ID()
|
||||
|
|
|
@ -21,6 +21,10 @@ func (s *Shard) SetMode(m mode.Mode) error {
|
|||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
|
||||
return s.setMode(m)
|
||||
}
|
||||
|
||||
func (s *Shard) setMode(m mode.Mode) error {
|
||||
components := []interface{ SetMode(mode.Mode) error }{
|
||||
s.metaBase, s.blobStor,
|
||||
}
|
||||
|
|
143
pkg/local_object_storage/shard/reload_test.go
Normal file
143
pkg/local_object_storage/shard/reload_test.go
Normal file
|
@ -0,0 +1,143 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||
sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test"
|
||||
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/version"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestShardReload(t *testing.T) {
|
||||
p := t.Name()
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
l := &logger.Logger{Logger: zaptest.NewLogger(t)}
|
||||
blobOpts := []blobstor.Option{
|
||||
blobstor.WithLogger(l),
|
||||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(p, "blob")),
|
||||
fstree.WithDepth(1)),
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
metaOpts := []meta.Option{
|
||||
meta.WithPath(filepath.Join(p, "meta")),
|
||||
meta.WithEpochState(epochState{})}
|
||||
|
||||
opts := []Option{
|
||||
WithLogger(l),
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
WithMetaBaseOptions(metaOpts...),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama")))}
|
||||
|
||||
sh := New(opts...)
|
||||
require.NoError(t, sh.Open())
|
||||
require.NoError(t, sh.Init())
|
||||
|
||||
objects := make([]objAddr, 5)
|
||||
for i := range objects {
|
||||
objects[i].obj = newObject()
|
||||
objects[i].addr = objectCore.AddressOf(objects[i].obj)
|
||||
require.NoError(t, putObject(sh, objects[i].obj))
|
||||
}
|
||||
|
||||
checkHasObjects := func(t *testing.T, exists bool) {
|
||||
for i := range objects {
|
||||
var prm ExistsPrm
|
||||
prm.SetAddress(objects[i].addr)
|
||||
|
||||
res, err := sh.Exists(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, exists, res.Exists(), "object #%d is missing", i)
|
||||
}
|
||||
}
|
||||
|
||||
checkHasObjects(t, true)
|
||||
|
||||
t.Run("same config, no-op", func(t *testing.T) {
|
||||
require.NoError(t, sh.Reload(opts...))
|
||||
checkHasObjects(t, true)
|
||||
})
|
||||
|
||||
t.Run("open meta at new path", func(t *testing.T) {
|
||||
newShardOpts := func(metaPath string, resync bool) []Option {
|
||||
metaOpts := []meta.Option{meta.WithPath(metaPath), meta.WithEpochState(epochState{})}
|
||||
return append(opts, WithMetaBaseOptions(metaOpts...), WithRefillMetabase(resync))
|
||||
}
|
||||
|
||||
newOpts := newShardOpts(filepath.Join(p, "meta1"), false)
|
||||
require.NoError(t, sh.Reload(newOpts...))
|
||||
|
||||
checkHasObjects(t, false) // new path, but no resync
|
||||
|
||||
t.Run("can put objects", func(t *testing.T) {
|
||||
obj := newObject()
|
||||
require.NoError(t, putObject(sh, obj))
|
||||
objects = append(objects, objAddr{obj: obj, addr: objectCore.AddressOf(obj)})
|
||||
})
|
||||
|
||||
newOpts = newShardOpts(filepath.Join(p, "meta2"), true)
|
||||
require.NoError(t, sh.Reload(newOpts...))
|
||||
|
||||
checkHasObjects(t, true) // all objects are restored, including the new one
|
||||
|
||||
t.Run("reload failed", func(t *testing.T) {
|
||||
badPath := filepath.Join(p, "meta3")
|
||||
require.NoError(t, os.WriteFile(badPath, []byte{1}, 0))
|
||||
|
||||
newOpts = newShardOpts(badPath, true)
|
||||
require.Error(t, sh.Reload(newOpts...))
|
||||
|
||||
// Cleanup is done, no panic.
|
||||
obj := newObject()
|
||||
require.ErrorIs(t, putObject(sh, obj), ErrReadOnlyMode)
|
||||
|
||||
// Old objects are still accessible.
|
||||
checkHasObjects(t, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func putObject(sh *Shard, obj *objectSDK.Object) error {
|
||||
var prm PutPrm
|
||||
prm.SetObject(obj)
|
||||
|
||||
_, err := sh.Put(prm)
|
||||
return err
|
||||
}
|
||||
|
||||
func newObject() *objectSDK.Object {
|
||||
x := objectSDK.New()
|
||||
ver := version.Current()
|
||||
|
||||
x.SetID(oidtest.ID())
|
||||
x.SetSessionToken(sessiontest.Object())
|
||||
x.SetPayload([]byte{1, 2, 3})
|
||||
x.SetPayloadSize(3)
|
||||
x.SetOwnerID(usertest.ID())
|
||||
x.SetContainerID(cidtest.ID())
|
||||
x.SetType(objectSDK.TypeRegular)
|
||||
x.SetVersion(&ver)
|
||||
x.SetPayloadChecksum(checksumtest.Checksum())
|
||||
x.SetPayloadHomomorphicHash(checksumtest.Checksum())
|
||||
return x
|
||||
}
|
Loading…
Reference in a new issue