WIP: node: Stop GC once termination signal received #385

Closed
acid-ant wants to merge 1 commit from acid-ant:bugfix/366-remove-garbage into master
2 changed files with 31 additions and 40 deletions

View file

@ -53,11 +53,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrDegradedMode return DeleteRes{}, ErrDegradedMode
} }
ln := len(prm.addr)
smalls := make(map[oid.Address][]byte, ln)
for i := range prm.addr { for i := range prm.addr {
select {
case <-ctx.Done():
return DeleteRes{}, ctx.Err()
default:
}
if s.hasWriteCache() { if s.hasWriteCache() {
err := s.writeCache.Delete(ctx, prm.addr[i]) err := s.writeCache.Delete(ctx, prm.addr[i])
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) { if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
@ -68,49 +70,38 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
var sPrm meta.StorageIDPrm var sPrm meta.StorageIDPrm
sPrm.SetAddress(prm.addr[i]) sPrm.SetAddress(prm.addr[i])
res, err := s.metaBase.StorageID(ctx, sPrm) var sid []byte
resSID, err := s.metaBase.StorageID(ctx, sPrm)
if err != nil { if err != nil {
s.log.Debug(logs.ShardCantGetStorageIDFromMetabase, s.log.Debug(logs.ShardCantGetStorageIDFromMetabase,
zap.Stringer("object", prm.addr[i]), zap.Stringer("object", prm.addr[i]),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} else {
continue sid = resSID.StorageID()
} }
if res.StorageID() != nil { var delPrmMeta meta.DeletePrm
smalls[prm.addr[i]] = res.StorageID() delPrmMeta.SetAddresses(prm.addr[i])
var resDelMeta meta.DeleteRes
resDelMeta, err = s.metaBase.Delete(ctx, delPrmMeta)
if err != nil {
return DeleteRes{}, err // stop on metabase error ?
} }
}
var delPrm meta.DeletePrm s.decObjectCounterBy(physical, resDelMeta.RawObjectsRemoved())
delPrm.SetAddresses(prm.addr...) s.decObjectCounterBy(logical, resDelMeta.AvailableObjectsRemoved())
logicalRemovedPayload := resDelMeta.RemovedLogicalObjectSizes()[0]
res, err := s.metaBase.Delete(ctx, delPrm)
if err != nil {
return DeleteRes{}, err // stop on metabase error ?
}
var totalRemovedPayload uint64
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
for i := range prm.addr {
removedPayload := res.RemovedPhysicalObjectSizes()[i]
totalRemovedPayload += removedPayload
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
if logicalRemovedPayload > 0 { if logicalRemovedPayload > 0 {
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload)) s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
} }
} s.addToPayloadSize(-int64(resDelMeta.RemovedPhysicalObjectSizes()[0]))
s.addToPayloadSize(-int64(totalRemovedPayload))
for i := range prm.addr { var delPrmBlob common.DeletePrm
var delPrm common.DeletePrm delPrmBlob.Address = prm.addr[i]
delPrm.Address = prm.addr[i] delPrmBlob.StorageID = sid
id := smalls[prm.addr[i]]
delPrm.StorageID = id
_, err = s.blobStor.Delete(ctx, delPrm) _, err = s.blobStor.Delete(ctx, delPrmBlob)
if err != nil { if err != nil {
s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor, s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor,
zap.Stringer("object_address", prm.addr[i]), zap.Stringer("object_address", prm.addr[i]),

View file

@ -76,7 +76,7 @@ type gc struct {
workerPool util.WorkerPool workerPool util.WorkerPool
remover func() remover func(context.Context)
eventChan chan Event eventChan chan Event
mEventHandler map[eventType]*eventHandlers mEventHandler map[eventType]*eventHandlers
@ -115,7 +115,7 @@ func (gc *gc) init(ctx context.Context) {
} }
gc.wg.Add(2) gc.wg.Add(2)
go gc.tickRemover() go gc.tickRemover(ctx)
go gc.listenEvents(ctx) go gc.listenEvents(ctx)
} }
@ -160,7 +160,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
} }
} }
func (gc *gc) tickRemover() { func (gc *gc) tickRemover(ctx context.Context) {
defer gc.wg.Done() defer gc.wg.Done()
timer := time.NewTimer(gc.removerInterval) timer := time.NewTimer(gc.removerInterval)
@ -178,7 +178,7 @@ func (gc *gc) tickRemover() {
gc.log.Debug(logs.ShardGCIsStopped) gc.log.Debug(logs.ShardGCIsStopped)
return return
case <-timer.C: case <-timer.C:
gc.remover() gc.remover(ctx)
timer.Reset(gc.removerInterval) timer.Reset(gc.removerInterval)
} }
} }
@ -196,7 +196,7 @@ func (gc *gc) stop() {
// iterates over metabase and deletes objects // iterates over metabase and deletes objects
// with GC-marked graves. // with GC-marked graves.
// Does nothing if shard is in "read-only" mode. // Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage() { func (s *Shard) removeGarbage(ctx context.Context) {
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() defer s.m.RUnlock()
@ -237,7 +237,7 @@ func (s *Shard) removeGarbage() {
deletePrm.SetAddresses(buf...) deletePrm.SetAddresses(buf...)
// delete accumulated objects // delete accumulated objects
_, err = s.delete(context.TODO(), deletePrm) _, err = s.delete(ctx, deletePrm)
if err != nil { if err != nil {
s.log.Warn(logs.ShardCouldNotDeleteTheObjects, s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
zap.String("error", err.Error()), zap.String("error", err.Error()),