From c785e11b20ad86ed3e818a1e7d5fe19d24e533bd Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sun, 16 Oct 2022 14:39:47 +0300 Subject: [PATCH] [#1869] shard: Allow to reload metabase on SIGHUP Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 2 + pkg/local_object_storage/engine/control.go | 31 +++- pkg/local_object_storage/metabase/control.go | 44 ++++++ pkg/local_object_storage/shard/control.go | 31 ++++ .../shard/control_test.go | 10 +- pkg/local_object_storage/shard/mode.go | 4 + pkg/local_object_storage/shard/reload_test.go | 143 ++++++++++++++++++ 7 files changed, 252 insertions(+), 13 deletions(-) create mode 100644 pkg/local_object_storage/shard/reload_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d2226c877..e47683d31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Changelog for NeoFS Node - `--force` flag to `neofs-cli control set-status` command (#1916) ### Changed +- Path to a metabase can now be reloaded with a SIGHUP. + ### Fixed - `writecache.max_object_size` is now correctly handled (#1925) - Correctly handle setting ONLINE netmap status after maintenance (#1922) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 8fd2ed253..6470e0d96 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -235,10 +235,16 @@ func (rCfg *ReConfiguration) AddShard(id string, opts []shard.Option) { // Reload reloads StorageEngine's configuration in runtime. func (e *StorageEngine) Reload(rcfg ReConfiguration) error { + type reloadInfo struct { + sh *shard.Shard + opts []shard.Option + } + e.mtx.RLock() var shardsToRemove []string // shards IDs - var shardsToAdd []string // meta paths + var shardsToAdd []string // shard config identifiers (blobstor paths concatenation) + var shardsToReload []reloadInfo // mark removed shards for removal for id, sh := range e.shards { @@ -248,27 +254,36 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { } } - // mark new shards for addition +loop: for newID := range rcfg.shards { - addShard := true for _, sh := range e.shards { // This calculation should be kept in sync with node // configuration parsing during SIGHUP. if newID == calculateShardID(sh.DumpInfo()) { - addShard = false - break + shardsToReload = append(shardsToReload, reloadInfo{ + sh: sh.Shard, + opts: rcfg.shards[newID], + }) + continue loop } } - if addShard { - shardsToAdd = append(shardsToAdd, newID) - } + shardsToAdd = append(shardsToAdd, newID) } e.mtx.RUnlock() e.removeShards(shardsToRemove...) + for _, p := range shardsToReload { + err := p.sh.Reload(p.opts...) + if err != nil { + e.log.Error("could not reload a shard", + zap.Stringer("shard id", p.sh.ID()), + zap.Error(err)) + } + } + for _, newID := range shardsToAdd { sh, err := e.createShard(rcfg.shards[newID]) if err != nil { diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index 93b9a4458..dc3e6e25e 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -1,14 +1,19 @@ package meta import ( + "errors" "fmt" "path/filepath" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/util" "go.etcd.io/bbolt" "go.uber.org/zap" ) +// ErrDegradedMode is returned when metabase is in a degraded mode. +var ErrDegradedMode = errors.New("metabase is in a degraded mode") + // Open boltDB instance for metabase. func (db *DB) Open(readOnly bool) error { err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) @@ -24,6 +29,12 @@ func (db *DB) Open(readOnly bool) error { } db.boltOptions.ReadOnly = readOnly + return db.openBolt() +} + +func (db *DB) openBolt() error { + var err error + db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) if err != nil { return fmt.Errorf("can't open boltDB database: %w", err) @@ -144,3 +155,36 @@ func (db *DB) Close() error { } return nil } + +// Reload reloads part of the configuration. +// It returns true iff database was reopened. +// If a config option is invalid, it logs an error and returns nil. +// If there was a problem with applying new configuration, an error is returned. +// +// If a metabase was couldn't be reopened because of an error, ErrDegradedMode is returned. +func (db *DB) Reload(opts ...Option) (bool, error) { + var c cfg + for i := range opts { + opts[i](&c) + } + + db.modeMtx.Lock() + defer db.modeMtx.Unlock() + + if c.info.Path != "" && filepath.Clean(db.info.Path) != filepath.Clean(c.info.Path) { + if err := db.Close(); err != nil { + return false, err + } + + db.mode = mode.Degraded + db.info.Path = c.info.Path + if err := db.openBolt(); err != nil { + return false, fmt.Errorf("%w: %v", ErrDegradedMode, err) + } + + db.mode = mode.ReadWrite + return true, nil + } + + return false, nil +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 30b227b2a..7abe8f4d2 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -265,3 +265,34 @@ func (s *Shard) Close() error { return nil } + +// Reload reloads configuration portions that are necessary. +// If a config option is invalid, it logs an error and returns nil. +// If there was a problem with applying new configuration, an error is returned. +func (s *Shard) Reload(opts ...Option) error { + // Do not use defaultCfg here missing options need not be reloaded. + var c cfg + for i := range opts { + opts[i](&c) + } + + s.m.Lock() + defer s.m.Unlock() + + ok, err := s.metaBase.Reload(c.metaOpts...) + if err != nil { + if errors.Is(err, meta.ErrDegradedMode) { + _ = s.setMode(mode.DegradedReadOnly) + } + return err + } + if ok { + if c.refillMetabase { + return s.refillMetabase() + } else { + return s.metaBase.Init() + } + } + + return nil +} diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index d08494415..0a5de6fd1 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -31,6 +31,11 @@ func (s epochState) CurrentEpoch() uint64 { return 0 } +type objAddr struct { + obj *objectSDK.Object + addr oid.Address +} + func TestShardOpen(t *testing.T) { dir := t.TempDir() metaPath := filepath.Join(dir, "meta") @@ -164,11 +169,6 @@ func TestRefillMetabase(t *testing.T) { const objNum = 5 - type objAddr struct { - obj *objectSDK.Object - addr oid.Address - } - mObjs := make(map[string]objAddr) locked := make([]oid.ID, 1, 2) locked[0] = oidtest.ID() diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 2cd070137..09dab4155 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -21,6 +21,10 @@ func (s *Shard) SetMode(m mode.Mode) error { s.m.Lock() defer s.m.Unlock() + return s.setMode(m) +} + +func (s *Shard) setMode(m mode.Mode) error { components := []interface{ SetMode(mode.Mode) error }{ s.metaBase, s.blobStor, } diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go new file mode 100644 index 000000000..2c5ad30b7 --- /dev/null +++ b/pkg/local_object_storage/shard/reload_test.go @@ -0,0 +1,143 @@ +package shard + +import ( + "os" + "path/filepath" + "testing" + + objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test" + usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestShardReload(t *testing.T) { + p := t.Name() + defer os.RemoveAll(p) + + l := &logger.Logger{Logger: zaptest.NewLogger(t)} + blobOpts := []blobstor.Option{ + blobstor.WithLogger(l), + blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: fstree.New( + fstree.WithPath(filepath.Join(p, "blob")), + fstree.WithDepth(1)), + }, + }), + } + + metaOpts := []meta.Option{ + meta.WithPath(filepath.Join(p, "meta")), + meta.WithEpochState(epochState{})} + + opts := []Option{ + WithLogger(l), + WithBlobStorOptions(blobOpts...), + WithMetaBaseOptions(metaOpts...), + WithPiloramaOptions( + pilorama.WithPath(filepath.Join(p, "pilorama")))} + + sh := New(opts...) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + objects := make([]objAddr, 5) + for i := range objects { + objects[i].obj = newObject() + objects[i].addr = objectCore.AddressOf(objects[i].obj) + require.NoError(t, putObject(sh, objects[i].obj)) + } + + checkHasObjects := func(t *testing.T, exists bool) { + for i := range objects { + var prm ExistsPrm + prm.SetAddress(objects[i].addr) + + res, err := sh.Exists(prm) + require.NoError(t, err) + require.Equal(t, exists, res.Exists(), "object #%d is missing", i) + } + } + + checkHasObjects(t, true) + + t.Run("same config, no-op", func(t *testing.T) { + require.NoError(t, sh.Reload(opts...)) + checkHasObjects(t, true) + }) + + t.Run("open meta at new path", func(t *testing.T) { + newShardOpts := func(metaPath string, resync bool) []Option { + metaOpts := []meta.Option{meta.WithPath(metaPath), meta.WithEpochState(epochState{})} + return append(opts, WithMetaBaseOptions(metaOpts...), WithRefillMetabase(resync)) + } + + newOpts := newShardOpts(filepath.Join(p, "meta1"), false) + require.NoError(t, sh.Reload(newOpts...)) + + checkHasObjects(t, false) // new path, but no resync + + t.Run("can put objects", func(t *testing.T) { + obj := newObject() + require.NoError(t, putObject(sh, obj)) + objects = append(objects, objAddr{obj: obj, addr: objectCore.AddressOf(obj)}) + }) + + newOpts = newShardOpts(filepath.Join(p, "meta2"), true) + require.NoError(t, sh.Reload(newOpts...)) + + checkHasObjects(t, true) // all objects are restored, including the new one + + t.Run("reload failed", func(t *testing.T) { + badPath := filepath.Join(p, "meta3") + require.NoError(t, os.WriteFile(badPath, []byte{1}, 0)) + + newOpts = newShardOpts(badPath, true) + require.Error(t, sh.Reload(newOpts...)) + + // Cleanup is done, no panic. + obj := newObject() + require.ErrorIs(t, putObject(sh, obj), ErrReadOnlyMode) + + // Old objects are still accessible. + checkHasObjects(t, true) + }) + }) +} + +func putObject(sh *Shard, obj *objectSDK.Object) error { + var prm PutPrm + prm.SetObject(obj) + + _, err := sh.Put(prm) + return err +} + +func newObject() *objectSDK.Object { + x := objectSDK.New() + ver := version.Current() + + x.SetID(oidtest.ID()) + x.SetSessionToken(sessiontest.Object()) + x.SetPayload([]byte{1, 2, 3}) + x.SetPayloadSize(3) + x.SetOwnerID(usertest.ID()) + x.SetContainerID(cidtest.ID()) + x.SetType(objectSDK.TypeRegular) + x.SetVersion(&ver) + x.SetPayloadChecksum(checksumtest.Checksum()) + x.SetPayloadHomomorphicHash(checksumtest.Checksum()) + return x +}