diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index dd1ad15e2..32a441637 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -124,14 +124,16 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) if shards[j].ID().String() == sid { continue } - ok := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object()) - if ok { - e.log.Debug("object is moved to another shard", - zap.String("from", sid), - zap.Stringer("to", shards[j].ID()), - zap.Stringer("addr", lst[i])) + putDone, exists := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object()) + if putDone || exists { + if putDone { + e.log.Debug("object is moved to another shard", + zap.String("from", sid), + zap.Stringer("to", shards[j].ID()), + zap.Stringer("addr", lst[i])) - res.count++ + res.count++ + } continue loop } } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 5617d8c55..fecba0568 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -106,6 +106,13 @@ func TestEvacuateShard(t *testing.T) { // Second case ensures that all objects are indeed moved and available. checkHasObjects(t) + // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. + res, err = e.Evacuate(prm) + require.NoError(t, err) + require.Equal(t, 0, res.count) + + checkHasObjects(t) + e.mtx.Lock() delete(e.shards, evacuateShardID) delete(e.shardPools, evacuateShardID) diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index db304678a..9c29a8c55 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -66,7 +66,8 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { pool := e.shardPools[sh.ID().String()] e.mtx.RUnlock() - finished = e.putToShard(sh.Shard, ind, pool, addr, prm.obj) + putDone, exists := e.putToShard(sh.Shard, ind, pool, addr, prm.obj) + finished = putDone || exists return finished }) @@ -77,8 +78,11 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return PutRes{}, err } -func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) bool { - var finished bool +// putToShard puts object to sh. +// First return value is true iff put has been successfully done. +// Second return value is true iff object already exists. +func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object) (bool, bool) { + var putSuccess, alreadyExists bool exitCh := make(chan struct{}) @@ -93,13 +97,14 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo if shard.IsErrObjectExpired(err) { // object is already found but // expired => do nothing with it - finished = true + alreadyExists = true } return // this is not ErrAlreadyRemoved error so we can go to the next shard } - if exists.Exists() { + alreadyExists = exists.Exists() + if alreadyExists { if ind != 0 { var toMoveItPrm shard.ToMoveItPrm toMoveItPrm.SetAddress(addr) @@ -113,8 +118,6 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo } } - finished = true - return } @@ -130,14 +133,14 @@ func (e *StorageEngine) putToShard(sh *shard.Shard, ind int, pool util.WorkerPoo return } - finished = true + putSuccess = true }); err != nil { close(exitCh) } <-exitCh - return finished + return putSuccess, alreadyExists } // Put writes provided object to local storage.