WIP: node: Stop GC once termination signal received #385
2 changed files with 31 additions and 40 deletions
|
@ -53,11 +53,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
return DeleteRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
ln := len(prm.addr)
|
||||
|
||||
smalls := make(map[oid.Address][]byte, ln)
|
||||
|
||||
for i := range prm.addr {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return DeleteRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if s.hasWriteCache() {
|
||||
err := s.writeCache.Delete(ctx, prm.addr[i])
|
||||
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
|
||||
|
@ -68,49 +70,38 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
|||
var sPrm meta.StorageIDPrm
|
||||
sPrm.SetAddress(prm.addr[i])
|
||||
|
||||
res, err := s.metaBase.StorageID(ctx, sPrm)
|
||||
var sid []byte
|
||||
resSID, err := s.metaBase.StorageID(ctx, sPrm)
|
||||
if err != nil {
|
||||
s.log.Debug(logs.ShardCantGetStorageIDFromMetabase,
|
||||
zap.Stringer("object", prm.addr[i]),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
continue
|
||||
} else {
|
||||
sid = resSID.StorageID()
|
||||
}
|
||||
|
||||
if res.StorageID() != nil {
|
||||
smalls[prm.addr[i]] = res.StorageID()
|
||||
var delPrmMeta meta.DeletePrm
|
||||
delPrmMeta.SetAddresses(prm.addr[i])
|
||||
|
||||
var resDelMeta meta.DeleteRes
|
||||
resDelMeta, err = s.metaBase.Delete(ctx, delPrmMeta)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err // stop on metabase error ?
|
||||
}
|
||||
}
|
||||
|
||||
var delPrm meta.DeletePrm
|
||||
delPrm.SetAddresses(prm.addr...)
|
||||
|
||||
res, err := s.metaBase.Delete(ctx, delPrm)
|
||||
if err != nil {
|
||||
return DeleteRes{}, err // stop on metabase error ?
|
||||
}
|
||||
|
||||
var totalRemovedPayload uint64
|
||||
|
||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
||||
for i := range prm.addr {
|
||||
removedPayload := res.RemovedPhysicalObjectSizes()[i]
|
||||
totalRemovedPayload += removedPayload
|
||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
|
||||
s.decObjectCounterBy(physical, resDelMeta.RawObjectsRemoved())
|
||||
s.decObjectCounterBy(logical, resDelMeta.AvailableObjectsRemoved())
|
||||
logicalRemovedPayload := resDelMeta.RemovedLogicalObjectSizes()[0]
|
||||
if logicalRemovedPayload > 0 {
|
||||
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
|
||||
}
|
||||
}
|
||||
s.addToPayloadSize(-int64(totalRemovedPayload))
|
||||
s.addToPayloadSize(-int64(resDelMeta.RemovedPhysicalObjectSizes()[0]))
|
||||
|
||||
for i := range prm.addr {
|
||||
var delPrm common.DeletePrm
|
||||
delPrm.Address = prm.addr[i]
|
||||
id := smalls[prm.addr[i]]
|
||||
delPrm.StorageID = id
|
||||
var delPrmBlob common.DeletePrm
|
||||
delPrmBlob.Address = prm.addr[i]
|
||||
delPrmBlob.StorageID = sid
|
||||
|
||||
_, err = s.blobStor.Delete(ctx, delPrm)
|
||||
_, err = s.blobStor.Delete(ctx, delPrmBlob)
|
||||
if err != nil {
|
||||
s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor,
|
||||
zap.Stringer("object_address", prm.addr[i]),
|
||||
|
|
|
@ -76,7 +76,7 @@ type gc struct {
|
|||
|
||||
workerPool util.WorkerPool
|
||||
|
||||
remover func()
|
||||
remover func(context.Context)
|
||||
|
||||
eventChan chan Event
|
||||
mEventHandler map[eventType]*eventHandlers
|
||||
|
@ -115,7 +115,7 @@ func (gc *gc) init(ctx context.Context) {
|
|||
}
|
||||
|
||||
gc.wg.Add(2)
|
||||
go gc.tickRemover()
|
||||
go gc.tickRemover(ctx)
|
||||
go gc.listenEvents(ctx)
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (gc *gc) tickRemover() {
|
||||
func (gc *gc) tickRemover(ctx context.Context) {
|
||||
defer gc.wg.Done()
|
||||
|
||||
timer := time.NewTimer(gc.removerInterval)
|
||||
|
@ -178,7 +178,7 @@ func (gc *gc) tickRemover() {
|
|||
gc.log.Debug(logs.ShardGCIsStopped)
|
||||
return
|
||||
case <-timer.C:
|
||||
gc.remover()
|
||||
gc.remover(ctx)
|
||||
timer.Reset(gc.removerInterval)
|
||||
}
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func (gc *gc) stop() {
|
|||
// iterates over metabase and deletes objects
|
||||
// with GC-marked graves.
|
||||
// Does nothing if shard is in "read-only" mode.
|
||||
func (s *Shard) removeGarbage() {
|
||||
func (s *Shard) removeGarbage(ctx context.Context) {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -237,7 +237,7 @@ func (s *Shard) removeGarbage() {
|
|||
deletePrm.SetAddresses(buf...)
|
||||
|
||||
// delete accumulated objects
|
||||
_, err = s.delete(context.TODO(), deletePrm)
|
||||
_, err = s.delete(ctx, deletePrm)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
|
||||
zap.String("error", err.Error()),
|
||||
|
|
Loading…
Reference in a new issue