[#1770] engine: Do not lock on shard init

Init can take a lot of time. Because the mutex is taken, all new operations
are blocked.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-09-28 00:43:38 +03:00 committed by fyrchik
parent fbd5bc8c38
commit 887afeaddb
2 changed files with 39 additions and 22 deletions

View file

@ -268,30 +268,29 @@ func (e *StorageEngine) Reload(rcfg ReConfiguration) error {
return fmt.Errorf("could not remove shards: %w", err)
}
e.mtx.Lock()
defer e.mtx.Unlock()
for _, newPath := range shardsToAdd {
id, err := e.addShard(rcfg.shards[newPath]...)
sh, err := e.createShard(rcfg.shards[newPath])
if err != nil {
return fmt.Errorf("could not add new shard: %w", err)
return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newPath, err)
}
idStr := id.String()
sh := e.shards[idStr]
idStr := sh.ID().String()
err = sh.Open()
if err == nil {
err = sh.Init()
}
if err != nil {
delete(e.shards, idStr)
e.shardPools[idStr].Release()
delete(e.shardPools, idStr)
_ = sh.Close()
return fmt.Errorf("could not init %s shard: %w", idStr, err)
}
err = e.addShard(sh)
if err != nil {
_ = sh.Close()
return fmt.Errorf("could not add %s shard: %w", idStr, err)
}
e.log.Info("added new shard", zap.String("id", idStr))
}

View file

@ -44,23 +44,27 @@ func (m metricsWithID) DecObjectCounter(objectType string) {
// Returns any error encountered that did not allow adding a shard.
// Otherwise returns the ID of the added shard.
func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.mtx.Lock()
defer e.mtx.Unlock()
return e.addShard(opts...)
}
func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
sh, err := e.createShard(opts)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not create a shard: %w", err)
}
err = e.addShard(sh)
if err != nil {
return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err)
}
return sh.ID(), nil
}
func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
}
e.mtx.RLock()
if e.metrics != nil {
opts = append(opts, shard.WithMetricsWriter(
metricsWithID{
@ -70,6 +74,8 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
))
}
e.mtx.RUnlock()
sh := shard.New(append(opts,
shard.WithID(id),
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
@ -81,9 +87,21 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
return nil, fmt.Errorf("could not update shard ID: %w", err)
}
return sh, err
}
func (e *StorageEngine) addShard(sh *shard.Shard) error {
e.mtx.Lock()
defer e.mtx.Unlock()
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil {
return fmt.Errorf("could not create pool: %w", err)
}
strID := sh.ID().String()
if _, ok := e.shards[strID]; ok {
return nil, fmt.Errorf("shard with id %s was already added", strID)
return fmt.Errorf("shard with id %s was already added", strID)
}
e.shards[strID] = shardWrapper{
@ -93,7 +111,7 @@ func (e *StorageEngine) addShard(opts ...shard.Option) (*shard.ID, error) {
e.shardPools[strID] = pool
return sh.ID(), nil
return nil
}
// removeShards removes specified shards. Skips non-existent shards.