WIP: node: Stop GC once termination signal received #385

Closed
acid-ant wants to merge 1 commit from acid-ant:bugfix/366-remove-garbage into master
2 changed files with 31 additions and 40 deletions

View file

@ -53,11 +53,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrDegradedMode
}
ln := len(prm.addr)
smalls := make(map[oid.Address][]byte, ln)
for i := range prm.addr {
select {
case <-ctx.Done():
return DeleteRes{}, ctx.Err()
default:
}
if s.hasWriteCache() {
err := s.writeCache.Delete(ctx, prm.addr[i])
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
@ -68,49 +70,38 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(prm.addr[i])
res, err := s.metaBase.StorageID(ctx, sPrm)
var sid []byte
resSID, err := s.metaBase.StorageID(ctx, sPrm)
if err != nil {
s.log.Debug(logs.ShardCantGetStorageIDFromMetabase,
zap.Stringer("object", prm.addr[i]),
zap.String("error", err.Error()))
continue
} else {
sid = resSID.StorageID()
}
if res.StorageID() != nil {
smalls[prm.addr[i]] = res.StorageID()
}
}
var delPrmMeta meta.DeletePrm
delPrmMeta.SetAddresses(prm.addr[i])
var delPrm meta.DeletePrm
delPrm.SetAddresses(prm.addr...)
res, err := s.metaBase.Delete(ctx, delPrm)
var resDelMeta meta.DeleteRes
resDelMeta, err = s.metaBase.Delete(ctx, delPrmMeta)
if err != nil {
return DeleteRes{}, err // stop on metabase error ?
}
var totalRemovedPayload uint64
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
for i := range prm.addr {
removedPayload := res.RemovedPhysicalObjectSizes()[i]
totalRemovedPayload += removedPayload
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
s.decObjectCounterBy(physical, resDelMeta.RawObjectsRemoved())
s.decObjectCounterBy(logical, resDelMeta.AvailableObjectsRemoved())
logicalRemovedPayload := resDelMeta.RemovedLogicalObjectSizes()[0]
if logicalRemovedPayload > 0 {
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
}
}
s.addToPayloadSize(-int64(totalRemovedPayload))
s.addToPayloadSize(-int64(resDelMeta.RemovedPhysicalObjectSizes()[0]))
for i := range prm.addr {
var delPrm common.DeletePrm
delPrm.Address = prm.addr[i]
id := smalls[prm.addr[i]]
delPrm.StorageID = id
var delPrmBlob common.DeletePrm
delPrmBlob.Address = prm.addr[i]
delPrmBlob.StorageID = sid
_, err = s.blobStor.Delete(ctx, delPrm)
_, err = s.blobStor.Delete(ctx, delPrmBlob)
if err != nil {
s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor,
zap.Stringer("object_address", prm.addr[i]),

View file

@ -76,7 +76,7 @@ type gc struct {
workerPool util.WorkerPool
remover func()
remover func(context.Context)
eventChan chan Event
mEventHandler map[eventType]*eventHandlers
@ -115,7 +115,7 @@ func (gc *gc) init(ctx context.Context) {
}
gc.wg.Add(2)
go gc.tickRemover()
go gc.tickRemover(ctx)
go gc.listenEvents(ctx)
}
@ -160,7 +160,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
}
}
func (gc *gc) tickRemover() {
func (gc *gc) tickRemover(ctx context.Context) {
defer gc.wg.Done()
timer := time.NewTimer(gc.removerInterval)
@ -178,7 +178,7 @@ func (gc *gc) tickRemover() {
gc.log.Debug(logs.ShardGCIsStopped)
return
case <-timer.C:
gc.remover()
gc.remover(ctx)
timer.Reset(gc.removerInterval)
}
}
@ -196,7 +196,7 @@ func (gc *gc) stop() {
// iterates over metabase and deletes objects
// with GC-marked graves.
// Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() {
func (s *Shard) removeGarbage(ctx context.Context) {
s.m.RLock()
defer s.m.RUnlock()
@ -237,7 +237,7 @@ func (s *Shard) removeGarbage() {
deletePrm.SetAddresses(buf...)
// delete accumulated objects
_, err = s.delete(context.TODO(), deletePrm)
_, err = s.delete(ctx, deletePrm)
if err != nil {
s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
zap.String("error", err.Error()),