From 19ad349b27a89896943367817c075930e946d70c Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 1 Mar 2022 11:59:05 +0300 Subject: [PATCH] [#1204] shard: Save ID in the metabase `AddShard` must return shard id, so we temporarily open metabase there. Signed-off-by: Evgenii Stratonikov --- .../engine/control_test.go | 29 +++++++++++++++ pkg/local_object_storage/engine/shards.go | 31 ++++++++++------ pkg/local_object_storage/metabase/shard_id.go | 36 +++++++++++++++++++ pkg/local_object_storage/shard/control.go | 1 - pkg/local_object_storage/shard/id.go | 22 ++++++++++++ pkg/local_object_storage/shard/shard.go | 2 +- 6 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 pkg/local_object_storage/metabase/shard_id.go diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 32ed34125..4d4a1a40e 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -3,8 +3,10 @@ package engine import ( "errors" "os" + "path/filepath" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/stretchr/testify/require" ) @@ -48,3 +50,30 @@ func TestExecBlocks(t *testing.T) { // try to resume require.Error(t, e.ResumeExecution()) } + +func TestPersistentShardID(t *testing.T) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + + e, _, id := newEngineWithErrorThreshold(t, dir, 1) + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + require.NoError(t, e.Close()) + + e, _, newID := newEngineWithErrorThreshold(t, dir, 1) + require.Equal(t, id, newID) + require.NoError(t, e.Close()) + + p1 := e.shards[id[0].String()].Shard.DumpInfo().MetaBaseInfo.Path + p2 := e.shards[id[1].String()].Shard.DumpInfo().MetaBaseInfo.Path + tmp := filepath.Join(dir, "tmp") + require.NoError(t, os.Rename(p1, tmp)) + require.NoError(t, os.Rename(p2, p1)) + require.NoError(t, os.Rename(tmp, p2)) + + e, _, newID = newEngineWithErrorThreshold(t, dir, 1) + require.Equal(t, id[1], newID[0]) + require.Equal(t, id[0], newID[1]) + require.NoError(t, e.Close()) + +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 22bcdf206..86ff4d77c 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -24,29 +24,38 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { e.mtx.Lock() defer e.mtx.Unlock() - id, err := generateShardID() - if err != nil { - return nil, fmt.Errorf("could not generate shard ID: %w", err) - } - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) if err != nil { return nil, err } - strID := id.String() + id, err := generateShardID() + if err != nil { + return nil, fmt.Errorf("could not generate shard ID: %w", err) + } + + sh := shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + )...) + + if err := sh.UpdateID(); err != nil { + return nil, fmt.Errorf("could not open shard: %w", err) + } + + strID := sh.ID().String() + if _, ok := e.shards[strID]; ok { + return nil, fmt.Errorf("shard with id %s was already added", strID) + } e.shards[strID] = shardWrapper{ errorCount: atomic.NewUint32(0), - Shard: shard.New(append(opts, - shard.WithID(id), - shard.WithExpiredObjectsCallback(e.processExpiredTombstones), - )...), + Shard: sh, } e.shardPools[strID] = pool - return id, nil + return sh.ID(), nil } func generateShardID() (*shard.ID, error) { diff --git a/pkg/local_object_storage/metabase/shard_id.go b/pkg/local_object_storage/metabase/shard_id.go new file mode 100644 index 000000000..93313032c --- /dev/null +++ b/pkg/local_object_storage/metabase/shard_id.go @@ -0,0 +1,36 @@ +package meta + +import ( + "github.com/nspcc-dev/neo-go/pkg/util/slice" + "go.etcd.io/bbolt" +) + +var ( + shardInfoBucket = []byte(invalidBase58String + "i") + shardIDKey = []byte("id") +) + +// ReadShardID reads shard id from db. +// If id is missing, returns nil, nil. +func (db *DB) ReadShardID() ([]byte, error) { + var id []byte + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(shardInfoBucket) + if b != nil { + id = slice.Copy(b.Get(shardIDKey)) + } + return nil + }) + return id, err +} + +// WriteShardID writes shard it to db. +func (db *DB) WriteShardID(id []byte) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(shardInfoBucket) + if err != nil { + return err + } + return b.Put(shardIDKey, id) + }) +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 2636477cc..26ac0cd3c 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -27,7 +27,6 @@ func (s *Shard) Open() error { return fmt.Errorf("could not open %T: %w", component, err) } } - return nil } diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 9121a8ec5..a7fdd56a9 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -23,3 +23,25 @@ func (id ID) String() string { func (s *Shard) ID() *ID { return s.info.ID } + +// UpdateID reads shard ID saved in the metabase and updates it if it is missing. +func (s *Shard) UpdateID() (err error) { + if err = s.metaBase.Open(); err != nil { + return err + } + defer func() { + cErr := s.metaBase.Close() + if err == nil { + err = cErr + } + }() + id, err := s.metaBase.ReadShardID() + if err != nil { + return err + } + if len(id) != 0 { + s.info.ID = NewIDFromBytes(id) + return nil + } + return s.metaBase.WriteShardID(*s.info.ID) +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index e0263e19c..5d888cc40 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -96,7 +96,7 @@ func New(opts ...Option) *Shard { return s } -// WithID returns option to set shard identifier. +// WithID returns option to set the default shard identifier. func WithID(id *ID) Option { return func(c *cfg) { c.info.ID = id