diff --git a/pkg/local_object_storage/engine/remove_copies.go b/pkg/local_object_storage/engine/remove_copies.go index c50c0844c..25872333d 100644 --- a/pkg/local_object_storage/engine/remove_copies.go +++ b/pkg/local_object_storage/engine/remove_copies.go @@ -128,7 +128,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address var deletePrm shard.DeletePrm deletePrm.SetAddresses(addr) - _, err = shards[i].Delete(deletePrm) + _, err = shards[i].Delete(ctx, deletePrm) if err != nil { return err } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d727d27a5..34617e1ee 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -296,8 +296,8 @@ func (s *Shard) Reload(opts ...Option) error { opts[i](&c) } - s.m.Lock() - defer s.m.Unlock() + unlock := s.lockExclusive() + defer unlock() ok, err := s.metaBase.Reload(c.metaOpts...) if err != nil { @@ -327,3 +327,15 @@ func (s *Shard) Reload(opts ...Option) error { s.log.Info("trying to restore read-write mode") return s.setMode(mode.ReadWrite) } + +func (s *Shard) lockExclusive() func() { + s.setModeRequested.Store(true) + val := s.gcCancel.Load() + if val != nil { + cancelGC := val.(context.CancelFunc) + cancelGC() + } + s.m.Lock() + s.setModeRequested.Store(false) + return s.m.Unlock +} diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 6ae3bf7dd..3fc7b627e 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -1,6 +1,7 @@ package shard import ( + "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -27,84 +28,88 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) { // Delete removes data from the shard's writeCache, metaBase and // blobStor. -func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { +func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { s.m.RLock() defer s.m.RUnlock() - return s.delete(prm) + return s.delete(ctx, prm) } -func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) { +func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { if s.info.Mode.ReadOnly() { return DeleteRes{}, ErrReadOnlyMode } else if s.info.Mode.NoMetabase() { return DeleteRes{}, ErrDegradedMode } - ln := len(prm.addr) - - smalls := make(map[oid.Address][]byte, ln) - - for i := range prm.addr { - if s.hasWriteCache() { - err := s.writeCache.Delete(prm.addr[i]) - if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { - s.log.Warn("can't delete object from write cache", zap.String("error", err.Error())) - } + for _, addr := range prm.addr { + select { + case <-ctx.Done(): + return DeleteRes{}, ctx.Err() + default: } - var sPrm meta.StorageIDPrm - sPrm.SetAddress(prm.addr[i]) + s.deleteObjectFromWriteCacheSafe(addr) - res, err := s.metaBase.StorageID(sPrm) - if err != nil { - s.log.Debug("can't get storage ID from metabase", - zap.Stringer("object", prm.addr[i]), - zap.String("error", err.Error())) + s.deleteFromBlobstorSafe(addr) - continue - } - - if res.StorageID() != nil { - smalls[prm.addr[i]] = res.StorageID() + if err := s.deleteFromMetabase(addr); err != nil { + return DeleteRes{}, err // stop on metabase error ? } } + return DeleteRes{}, nil +} +func (s *Shard) deleteObjectFromWriteCacheSafe(addr oid.Address) { + if s.hasWriteCache() { + err := s.writeCache.Delete(addr) + if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { + s.log.Warn("can't delete object from write cache", zap.String("error", err.Error())) + } + } +} + +func (s *Shard) deleteFromMetabase(addr oid.Address) error { var delPrm meta.DeletePrm - delPrm.SetAddresses(prm.addr...) + delPrm.SetAddresses(addr) res, err := s.metaBase.Delete(delPrm) if err != nil { - return DeleteRes{}, err // stop on metabase error ? + return err } - var totalRemovedPayload uint64 - s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) - for i := range prm.addr { - removedPayload := res.RemovedPhysicalObjectSizes()[i] - totalRemovedPayload += removedPayload - logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i] - if logicalRemovedPayload > 0 { - s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload)) - } + removedPayload := res.RemovedPhysicalObjectSizes()[0] + logicalRemovedPayload := res.RemovedLogicalObjectSizes()[0] + if logicalRemovedPayload > 0 { + s.addToContainerSize(addr.Container().EncodeToString(), -int64(logicalRemovedPayload)) } - s.addToPayloadSize(-int64(totalRemovedPayload)) + s.addToPayloadSize(-int64(removedPayload)) - for i := range prm.addr { - var delPrm common.DeletePrm - delPrm.Address = prm.addr[i] - id := smalls[prm.addr[i]] - delPrm.StorageID = id - - _, err = s.blobStor.Delete(delPrm) - if err != nil { - s.log.Debug("can't remove object from blobStor", - zap.Stringer("object_address", prm.addr[i]), - zap.String("error", err.Error())) - } - } - - return DeleteRes{}, nil + return nil +} + +func (s *Shard) deleteFromBlobstorSafe(addr oid.Address) { + var sPrm meta.StorageIDPrm + sPrm.SetAddress(addr) + + res, err := s.metaBase.StorageID(sPrm) + if err != nil { + s.log.Debug("can't get storage ID from metabase", + zap.Stringer("object", addr), + zap.String("error", err.Error())) + } + storageID := res.StorageID() + + var delPrm common.DeletePrm + delPrm.Address = addr + delPrm.StorageID = storageID + + _, err = s.blobStor.Delete(delPrm) + if err != nil { + s.log.Debug("can't remove object from blobStor", + zap.Stringer("object_address", addr), + zap.String("error", err.Error())) + } } diff --git a/pkg/local_object_storage/shard/delete_test.go b/pkg/local_object_storage/shard/delete_test.go index c37dfa285..7ecfb59aa 100644 --- a/pkg/local_object_storage/shard/delete_test.go +++ b/pkg/local_object_storage/shard/delete_test.go @@ -49,7 +49,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err = testGet(t, sh, getPrm, hasWriteCache) require.NoError(t, err) - _, err = sh.Delete(delPrm) + _, err = sh.Delete(context.Background(), delPrm) require.NoError(t, err) _, err = sh.Get(context.Background(), getPrm) @@ -73,7 +73,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) { _, err = sh.Get(context.Background(), getPrm) require.NoError(t, err) - _, err = sh.Delete(delPrm) + _, err = sh.Delete(context.Background(), delPrm) require.NoError(t, err) _, err = sh.Get(context.Background(), getPrm) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index efa61c85c..254a7ced7 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -196,6 +196,14 @@ func (gc *gc) stop() { // with GC-marked graves. // Does nothing if shard is in "read-only" mode. func (s *Shard) removeGarbage() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.gcCancel.Store(cancel) + if s.setModeRequested.Load() { + return + } + s.m.RLock() defer s.m.RUnlock() @@ -207,6 +215,12 @@ func (s *Shard) removeGarbage() { var iterPrm meta.GarbageIterationPrm iterPrm.SetHandler(func(g meta.GarbageObject) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + buf = append(buf, g.Address()) if len(buf) == s.rmBatchSize { @@ -233,7 +247,7 @@ func (s *Shard) removeGarbage() { deletePrm.SetAddresses(buf...) // delete accumulated objects - _, err = s.delete(deletePrm) + _, err = s.delete(ctx, deletePrm) if err != nil { s.log.Warn("could not delete the objects", zap.String("error", err.Error()), diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 18e97e259..d39a98224 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -168,7 +168,7 @@ func TestCounters(t *testing.T) { deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) - _, err := sh.Delete(prm) + _, err := sh.Delete(context.Background(), prm) require.NoError(t, err) require.Equal(t, phy-uint64(deletedNumber), mm.objCounters[physical]) diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 17ed3f3c8..dd4e08f2b 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -18,8 +18,8 @@ var ErrDegradedMode = logicerr.New("shard is in degraded mode") // Returns any error encountered that did not allow // setting shard mode. func (s *Shard) SetMode(m mode.Mode) error { - s.m.Lock() - defer s.m.Unlock() + unlock := s.lockExclusive() + defer unlock() return s.setMode(m) } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 6d1fba141..b77df62d0 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -3,6 +3,7 @@ package shard import ( "context" "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" @@ -31,6 +32,9 @@ type Shard struct { metaBase *meta.DB tsSource TombstoneSource + + gcCancel atomic.Value + setModeRequested atomic.Bool } // Option represents Shard's constructor option. @@ -209,12 +213,12 @@ func WithWriteCache(use bool) Option { } // hasWriteCache returns bool if write cache exists on shards. -func (s Shard) hasWriteCache() bool { +func (s *Shard) hasWriteCache() bool { return s.cfg.useWriteCache } // needRefillMetabase returns true if metabase is needed to be refilled. -func (s Shard) needRefillMetabase() bool { +func (s *Shard) needRefillMetabase() bool { return s.cfg.refillMetabase }