From b064fb24d8780c01d55bb7f716523ecee02c368d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 14 Sep 2022 17:18:11 +0300 Subject: [PATCH] [#1616] engine: Do not use batches in delete Use a simple loop at the callsite. This way we remove as much as we can. Also, `Delete` metrics is more meaningful now. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/delete.go | 66 ++++++++++---------- pkg/local_object_storage/engine/lock_test.go | 2 +- pkg/services/control/server/gc.go | 19 ++++-- 3 files changed, 46 insertions(+), 41 deletions(-) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 6fff7b661..3cba830c4 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -11,7 +11,7 @@ import ( // DeletePrm groups the parameters of Delete operation. type DeletePrm struct { - addr []oid.Address + addr oid.Address forceRemoval bool } @@ -19,12 +19,12 @@ type DeletePrm struct { // DeleteRes groups the resulting values of Delete operation. type DeleteRes struct{} -// WithAddresses is a Delete option to set the addresses of the objects to delete. +// WithAddress is a Delete option to set the addresses of the objects to delete. // // Option is required. -func (p *DeletePrm) WithAddresses(addr ...oid.Address) { +func (p *DeletePrm) WithAddress(addr oid.Address) { if p != nil { - p.addr = append(p.addr, addr...) + p.addr = addr } } @@ -66,43 +66,41 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { err apistatus.ObjectLocked } - for i := range prm.addr { - e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) { - var existsPrm shard.ExistsPrm - existsPrm.SetAddress(prm.addr[i]) + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + var existsPrm shard.ExistsPrm + existsPrm.SetAddress(prm.addr) - resExists, err := sh.Exists(existsPrm) - if err != nil { - _, ok := err.(*objectSDK.SplitInfoError) - if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { - return true - } - if !shard.IsErrNotFound(err) { - e.reportShardError(sh, "could not check object existence", err) - } - return false - } else if !resExists.Exists() { - return false + resExists, err := sh.Exists(existsPrm) + if err != nil { + _, ok := err.(*objectSDK.SplitInfoError) + if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { + return true } - - var shPrm shard.InhumePrm - shPrm.MarkAsGarbage(prm.addr[i]) - if prm.forceRemoval { - shPrm.ForceRemoval() + if !shard.IsErrNotFound(err) { + e.reportShardError(sh, "could not check object existence", err) } + return false + } else if !resExists.Exists() { + return false + } - _, err = sh.Inhume(shPrm) - if err != nil { - e.reportShardError(sh, "could not inhume object in shard", err) + var shPrm shard.InhumePrm + shPrm.MarkAsGarbage(prm.addr) + if prm.forceRemoval { + shPrm.ForceRemoval() + } - locked.is = errors.As(err, &locked.err) + _, err = sh.Inhume(shPrm) + if err != nil { + e.reportShardError(sh, "could not inhume object in shard", err) - return locked.is - } + locked.is = errors.As(err, &locked.err) - return true - }) - } + return locked.is + } + + return true + }) if locked.is { return DeleteRes{}, locked.err diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index ba1582a46..f24433a7f 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -299,7 +299,7 @@ func TestLockForceRemoval(t *testing.T) { // 4. var deletePrm DeletePrm - deletePrm.WithAddresses(objectcore.AddressOf(lock)) + deletePrm.WithAddress(objectcore.AddressOf(lock)) deletePrm.WithForceRemoval() _, err = e.Delete(deletePrm) diff --git a/pkg/services/control/server/gc.go b/pkg/services/control/server/gc.go index 0982cc2db..de082a39e 100644 --- a/pkg/services/control/server/gc.go +++ b/pkg/services/control/server/gc.go @@ -35,13 +35,20 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) } } - var prm engine.DeletePrm - prm.WithAddresses(addrList...) - prm.WithForceRemoval() + var firstErr error + for i := range addrList { + var prm engine.DeletePrm + prm.WithForceRemoval() + prm.WithAddress(addrList[i]) - _, err := s.s.Delete(prm) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + _, err := s.s.Delete(prm) + if err != nil && firstErr == nil { + firstErr = err + } + } + + if firstErr != nil { + return nil, status.Error(codes.Internal, firstErr.Error()) } // create and fill response