From 7377979e12c368762d52c3e19dfcdeb5111612d0 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 12 Sep 2022 14:48:06 +0300 Subject: [PATCH] [#1731] engine: Move single shard PUT to a separate function Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/put.go | 124 +++++++++++++------------ 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index f6f98f55..db304678 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -5,7 +5,9 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) @@ -57,9 +59,6 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return PutRes{}, err } - var existPrm shard.ExistsPrm - existPrm.SetAddress(addr) - finished := false e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { @@ -67,61 +66,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { pool := e.shardPools[sh.ID().String()] e.mtx.RUnlock() - exitCh := make(chan struct{}) - - if err := pool.Submit(func() { - defer close(exitCh) - - exists, err := sh.Exists(existPrm) - if err != nil { - if shard.IsErrObjectExpired(err) { - // object is already found but - // expired => do nothing with it - finished = true - } - - return // this is not ErrAlreadyRemoved error so we can go to the next shard - } - - if exists.Exists() { - if ind != 0 { - var toMoveItPrm shard.ToMoveItPrm - toMoveItPrm.SetAddress(addr) - - _, err = sh.ToMoveIt(toMoveItPrm) - if err != nil { - e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - } - } - - finished = true - - return - } - - var putPrm shard.PutPrm - putPrm.SetObject(prm.obj) - - _, err = sh.Put(putPrm) - if err != nil { - e.log.Warn("could not put object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - - return - } - - finished = true - }); err != nil { - close(exitCh) - } - - <-exitCh - + finished = e.putToShard(sh.Shard, ind, pool, addr, prm.obj) return finished }) @@ -132,6 +77,69 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return PutRes{}, err } +func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) bool { + var finished bool + + exitCh := make(chan struct{}) + + if err := pool.Submit(func() { + defer close(exitCh) + + var existPrm shard.ExistsPrm + existPrm.SetAddress(addr) + + exists, err := sh.Exists(existPrm) + if err != nil { + if shard.IsErrObjectExpired(err) { + // object is already found but + // expired => do nothing with it + finished = true + } + + return // this is not ErrAlreadyRemoved error so we can go to the next shard + } + + if exists.Exists() { + if ind != 0 { + var toMoveItPrm shard.ToMoveItPrm + toMoveItPrm.SetAddress(addr) + + _, err = sh.ToMoveIt(toMoveItPrm) + if err != nil { + e.log.Warn("could not mark object for shard relocation", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error()), + ) + } + } + + finished = true + + return + } + + var putPrm shard.PutPrm + putPrm.SetObject(obj) + + _, err = sh.Put(putPrm) + if err != nil { + e.log.Warn("could not put object in shard", + zap.Stringer("shard", sh.ID()), + zap.String("error", err.Error())) + + return + } + + finished = true + }); err != nil { + close(exitCh) + } + + <-exitCh + + return finished +} + // Put writes provided object to local storage. func Put(storage *StorageEngine, obj *objectSDK.Object) error { var putPrm PutPrm