forked from TrueCloudLab/frostfs-node
[#1731] engine: Move single shard PUT to a separate function
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
4208f7c0cf
commit
7377979e12
1 changed files with 66 additions and 58 deletions
|
@ -5,7 +5,9 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -57,9 +59,6 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, err
|
return PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var existPrm shard.ExistsPrm
|
|
||||||
existPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
finished := false
|
finished := false
|
||||||
|
|
||||||
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
||||||
|
@ -67,61 +66,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
||||||
pool := e.shardPools[sh.ID().String()]
|
pool := e.shardPools[sh.ID().String()]
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
exitCh := make(chan struct{})
|
finished = e.putToShard(sh.Shard, ind, pool, addr, prm.obj)
|
||||||
|
|
||||||
if err := pool.Submit(func() {
|
|
||||||
defer close(exitCh)
|
|
||||||
|
|
||||||
exists, err := sh.Exists(existPrm)
|
|
||||||
if err != nil {
|
|
||||||
if shard.IsErrObjectExpired(err) {
|
|
||||||
// object is already found but
|
|
||||||
// expired => do nothing with it
|
|
||||||
finished = true
|
|
||||||
}
|
|
||||||
|
|
||||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
|
||||||
}
|
|
||||||
|
|
||||||
if exists.Exists() {
|
|
||||||
if ind != 0 {
|
|
||||||
var toMoveItPrm shard.ToMoveItPrm
|
|
||||||
toMoveItPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
_, err = sh.ToMoveIt(toMoveItPrm)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Warn("could not mark object for shard relocation",
|
|
||||||
zap.Stringer("shard", sh.ID()),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
finished = true
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
|
||||||
putPrm.SetObject(prm.obj)
|
|
||||||
|
|
||||||
_, err = sh.Put(putPrm)
|
|
||||||
if err != nil {
|
|
||||||
e.log.Warn("could not put object in shard",
|
|
||||||
zap.Stringer("shard", sh.ID()),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
finished = true
|
|
||||||
}); err != nil {
|
|
||||||
close(exitCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-exitCh
|
|
||||||
|
|
||||||
return finished
|
return finished
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -132,6 +77,69 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
||||||
return PutRes{}, err
|
return PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) bool {
|
||||||
|
var finished bool
|
||||||
|
|
||||||
|
exitCh := make(chan struct{})
|
||||||
|
|
||||||
|
if err := pool.Submit(func() {
|
||||||
|
defer close(exitCh)
|
||||||
|
|
||||||
|
var existPrm shard.ExistsPrm
|
||||||
|
existPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
exists, err := sh.Exists(existPrm)
|
||||||
|
if err != nil {
|
||||||
|
if shard.IsErrObjectExpired(err) {
|
||||||
|
// object is already found but
|
||||||
|
// expired => do nothing with it
|
||||||
|
finished = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||||
|
}
|
||||||
|
|
||||||
|
if exists.Exists() {
|
||||||
|
if ind != 0 {
|
||||||
|
var toMoveItPrm shard.ToMoveItPrm
|
||||||
|
toMoveItPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
_, err = sh.ToMoveIt(toMoveItPrm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Warn("could not mark object for shard relocation",
|
||||||
|
zap.Stringer("shard", sh.ID()),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
finished = true
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var putPrm shard.PutPrm
|
||||||
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
|
_, err = sh.Put(putPrm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Warn("could not put object in shard",
|
||||||
|
zap.Stringer("shard", sh.ID()),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
finished = true
|
||||||
|
}); err != nil {
|
||||||
|
close(exitCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-exitCh
|
||||||
|
|
||||||
|
return finished
|
||||||
|
}
|
||||||
|
|
||||||
// Put writes provided object to local storage.
|
// Put writes provided object to local storage.
|
||||||
func Put(storage *StorageEngine, obj *objectSDK.Object) error {
|
func Put(storage *StorageEngine, obj *objectSDK.Object) error {
|
||||||
var putPrm PutPrm
|
var putPrm PutPrm
|
||||||
|
|
Loading…
Reference in a new issue