diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index d5228d24..00053d2f 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -268,30 +268,29 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error { return fmt.Errorf("could not remove shards: %w", err) } - e.mtx.Lock() - defer e.mtx.Unlock() - for _, newPath := range shardsToAdd { - id, err := e.addShard(rcfg.shards[newPath]...) + sh, err := e.createShard(rcfg.shards[newPath]) if err != nil { - return fmt.Errorf("could not add new shard: %w", err) + return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newPath, err) } - idStr := id.String() - sh := e.shards[idStr] + idStr := sh.ID().String() err = sh.Open() if err == nil { err = sh.Init() } if err != nil { - delete(e.shards, idStr) - e.shardPools[idStr].Release() - delete(e.shardPools, idStr) - + _ = sh.Close() return fmt.Errorf("could not init %s shard: %w", idStr, err) } + err = e.addShard(sh) + if err != nil { + _ = sh.Close() + return fmt.Errorf("could not add %s shard: %w", idStr, err) + } + e.log.Info("added new shard", zap.String("id", idStr)) } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 4c14ba65..91a1b2af 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -44,23 +44,27 @@ func (m metricsWithID) DecObjectCounter(objectType string) { // Returns any error encountered that did not allow adding a shard. // Otherwise returns the ID of the added shard. func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { - e.mtx.Lock() - defer e.mtx.Unlock() - - return e.addShard(opts...) -} - -func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) { - pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) + sh, err := e.createShard(opts) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create a shard: %w", err) } + err = e.addShard(sh) + if err != nil { + return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) + } + + return sh.ID(), nil +} + +func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { id, err := generateShardID() if err != nil { return nil, fmt.Errorf("could not generate shard ID: %w", err) } + e.mtx.RLock() + if e.metrics != nil { opts = append(opts, shard.WithMetricsWriter( metricsWithID{ @@ -70,6 +74,8 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) { )) } + e.mtx.RUnlock() + sh := shard.New(append(opts, shard.WithID(id), shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), @@ -81,9 +87,21 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) { return nil, fmt.Errorf("could not update shard ID: %w", err) } + return sh, err +} + +func (e *StorageEngine) addShard(sh *shard.Shard) error { + e.mtx.Lock() + defer e.mtx.Unlock() + + pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) + if err != nil { + return fmt.Errorf("could not create pool: %w", err) + } + strID := sh.ID().String() if _, ok := e.shards[strID]; ok { - return nil, fmt.Errorf("shard with id %s was already added", strID) + return fmt.Errorf("shard with id %s was already added", strID) } e.shards[strID] = shardWrapper{ @@ -93,7 +111,7 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) { e.shardPools[strID] = pool - return sh.ID(), nil + return nil } // removeShards removes specified shards. Skips non-existent shards.