WIP: node: Stop GC once termination signal received #385
2 changed files with 31 additions and 40 deletions
|
@ -53,11 +53,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
return DeleteRes{}, ErrDegradedMode
|
return DeleteRes{}, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
ln := len(prm.addr)
|
|
||||||
|
|
||||||
smalls := make(map[oid.Address][]byte, ln)
|
|
||||||
|
|
||||||
for i := range prm.addr {
|
for i := range prm.addr {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return DeleteRes{}, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
err := s.writeCache.Delete(ctx, prm.addr[i])
|
err := s.writeCache.Delete(ctx, prm.addr[i])
|
||||||
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
|
if err != nil && !IsErrNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
|
||||||
|
@ -68,49 +70,38 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
var sPrm meta.StorageIDPrm
|
var sPrm meta.StorageIDPrm
|
||||||
sPrm.SetAddress(prm.addr[i])
|
sPrm.SetAddress(prm.addr[i])
|
||||||
|
|
||||||
res, err := s.metaBase.StorageID(ctx, sPrm)
|
var sid []byte
|
||||||
|
resSID, err := s.metaBase.StorageID(ctx, sPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug(logs.ShardCantGetStorageIDFromMetabase,
|
s.log.Debug(logs.ShardCantGetStorageIDFromMetabase,
|
||||||
zap.Stringer("object", prm.addr[i]),
|
zap.Stringer("object", prm.addr[i]),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
} else {
|
||||||
continue
|
sid = resSID.StorageID()
|
||||||
}
|
}
|
||||||
|
|
||||||
if res.StorageID() != nil {
|
var delPrmMeta meta.DeletePrm
|
||||||
smalls[prm.addr[i]] = res.StorageID()
|
delPrmMeta.SetAddresses(prm.addr[i])
|
||||||
|
|
||||||
|
var resDelMeta meta.DeleteRes
|
||||||
|
resDelMeta, err = s.metaBase.Delete(ctx, delPrmMeta)
|
||||||
|
if err != nil {
|
||||||
|
return DeleteRes{}, err // stop on metabase error ?
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var delPrm meta.DeletePrm
|
s.decObjectCounterBy(physical, resDelMeta.RawObjectsRemoved())
|
||||||
delPrm.SetAddresses(prm.addr...)
|
s.decObjectCounterBy(logical, resDelMeta.AvailableObjectsRemoved())
|
||||||
|
logicalRemovedPayload := resDelMeta.RemovedLogicalObjectSizes()[0]
|
||||||
res, err := s.metaBase.Delete(ctx, delPrm)
|
|
||||||
if err != nil {
|
|
||||||
return DeleteRes{}, err // stop on metabase error ?
|
|
||||||
}
|
|
||||||
|
|
||||||
var totalRemovedPayload uint64
|
|
||||||
|
|
||||||
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
|
||||||
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
|
||||||
for i := range prm.addr {
|
|
||||||
removedPayload := res.RemovedPhysicalObjectSizes()[i]
|
|
||||||
totalRemovedPayload += removedPayload
|
|
||||||
logicalRemovedPayload := res.RemovedLogicalObjectSizes()[i]
|
|
||||||
if logicalRemovedPayload > 0 {
|
if logicalRemovedPayload > 0 {
|
||||||
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
|
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(logicalRemovedPayload))
|
||||||
}
|
}
|
||||||
}
|
s.addToPayloadSize(-int64(resDelMeta.RemovedPhysicalObjectSizes()[0]))
|
||||||
s.addToPayloadSize(-int64(totalRemovedPayload))
|
|
||||||
|
|
||||||
for i := range prm.addr {
|
var delPrmBlob common.DeletePrm
|
||||||
var delPrm common.DeletePrm
|
delPrmBlob.Address = prm.addr[i]
|
||||||
delPrm.Address = prm.addr[i]
|
delPrmBlob.StorageID = sid
|
||||||
id := smalls[prm.addr[i]]
|
|
||||||
delPrm.StorageID = id
|
|
||||||
|
|
||||||
_, err = s.blobStor.Delete(ctx, delPrm)
|
_, err = s.blobStor.Delete(ctx, delPrmBlob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor,
|
s.log.Debug(logs.ShardCantRemoveObjectFromBlobStor,
|
||||||
zap.Stringer("object_address", prm.addr[i]),
|
zap.Stringer("object_address", prm.addr[i]),
|
||||||
|
|
|
@ -76,7 +76,7 @@ type gc struct {
|
||||||
|
|
||||||
workerPool util.WorkerPool
|
workerPool util.WorkerPool
|
||||||
|
|
||||||
remover func()
|
remover func(context.Context)
|
||||||
|
|
||||||
eventChan chan Event
|
eventChan chan Event
|
||||||
mEventHandler map[eventType]*eventHandlers
|
mEventHandler map[eventType]*eventHandlers
|
||||||
|
@ -115,7 +115,7 @@ func (gc *gc) init(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
gc.wg.Add(2)
|
gc.wg.Add(2)
|
||||||
go gc.tickRemover()
|
go gc.tickRemover(ctx)
|
||||||
go gc.listenEvents(ctx)
|
go gc.listenEvents(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *gc) tickRemover() {
|
func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
defer gc.wg.Done()
|
defer gc.wg.Done()
|
||||||
|
|
||||||
timer := time.NewTimer(gc.removerInterval)
|
timer := time.NewTimer(gc.removerInterval)
|
||||||
|
@ -178,7 +178,7 @@ func (gc *gc) tickRemover() {
|
||||||
gc.log.Debug(logs.ShardGCIsStopped)
|
gc.log.Debug(logs.ShardGCIsStopped)
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
gc.remover()
|
gc.remover(ctx)
|
||||||
timer.Reset(gc.removerInterval)
|
timer.Reset(gc.removerInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,7 +196,7 @@ func (gc *gc) stop() {
|
||||||
// iterates over metabase and deletes objects
|
// iterates over metabase and deletes objects
|
||||||
// with GC-marked graves.
|
// with GC-marked graves.
|
||||||
// Does nothing if shard is in "read-only" mode.
|
// Does nothing if shard is in "read-only" mode.
|
||||||
func (s *Shard) removeGarbage() {
|
func (s *Shard) removeGarbage(ctx context.Context) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ func (s *Shard) removeGarbage() {
|
||||||
deletePrm.SetAddresses(buf...)
|
deletePrm.SetAddresses(buf...)
|
||||||
|
|
||||||
// delete accumulated objects
|
// delete accumulated objects
|
||||||
_, err = s.delete(context.TODO(), deletePrm)
|
_, err = s.delete(ctx, deletePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
|
s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
|
Loading…
Reference in a new issue