[#1731] engine: Return the amount of actually moved objects in Evacuate

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-09-13 14:18:00 +03:00 committed by fyrchik
parent 8fc88487db
commit 3df98ce7ba
3 changed files with 28 additions and 16 deletions

View file

@ -124,14 +124,16 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
if shards[j].ID().String() == sid { if shards[j].ID().String() == sid {
continue continue
} }
ok := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object()) putDone, exists := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object())
if ok { if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard", e.log.Debug("object is moved to another shard",
zap.String("from", sid), zap.String("from", sid),
zap.Stringer("to", shards[j].ID()), zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", lst[i])) zap.Stringer("addr", lst[i]))
res.count++ res.count++
}
continue loop continue loop
} }
} }

View file

@ -106,6 +106,13 @@ func TestEvacuateShard(t *testing.T) {
// Second case ensures that all objects are indeed moved and available. // Second case ensures that all objects are indeed moved and available.
checkHasObjects(t) checkHasObjects(t)
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(prm)
require.NoError(t, err)
require.Equal(t, 0, res.count)
checkHasObjects(t)
e.mtx.Lock() e.mtx.Lock()
delete(e.shards, evacuateShardID) delete(e.shards, evacuateShardID)
delete(e.shardPools, evacuateShardID) delete(e.shardPools, evacuateShardID)

View file

@ -66,7 +66,8 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
pool := e.shardPools[sh.ID().String()] pool := e.shardPools[sh.ID().String()]
e.mtx.RUnlock() e.mtx.RUnlock()
finished = e.putToShard(sh.Shard, ind, pool, addr, prm.obj) putDone, exists := e.putToShard(sh.Shard, ind, pool, addr, prm.obj)
finished = putDone || exists
return finished return finished
}) })
@ -77,8 +78,11 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
return PutRes{}, err return PutRes{}, err
} }
func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) bool { // putToShard puts object to sh.
var finished bool // First return value is true iff put has been successfully done.
// Second return value is true iff object already exists.
func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) {
var putSuccess, alreadyExists bool
exitCh := make(chan struct{}) exitCh := make(chan struct{})
@ -93,13 +97,14 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo
if shard.IsErrObjectExpired(err) { if shard.IsErrObjectExpired(err) {
// object is already found but // object is already found but
// expired => do nothing with it // expired => do nothing with it
finished = true alreadyExists = true
} }
return // this is not ErrAlreadyRemoved error so we can go to the next shard return // this is not ErrAlreadyRemoved error so we can go to the next shard
} }
if exists.Exists() { alreadyExists = exists.Exists()
if alreadyExists {
if ind != 0 { if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr) toMoveItPrm.SetAddress(addr)
@ -113,8 +118,6 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo
} }
} }
finished = true
return return
} }
@ -130,14 +133,14 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo
return return
} }
finished = true putSuccess = true
}); err != nil { }); err != nil {
close(exitCh) close(exitCh)
} }
<-exitCh <-exitCh
return finished return putSuccess, alreadyExists
} }
// Put writes provided object to local storage. // Put writes provided object to local storage.