forked from TrueCloudLab/frostfs-node
[#145] shard-gc: Delete expired objects after locks
GC deletes expired locks and objects sequentially. Expired locks and objects are now being deleted concurrently in batches. Added a config parameter that controls the number of concurrent workers and batch size. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6c4a1699ef
commit
5059dcc19d
10 changed files with 196 additions and 32 deletions
|
@ -119,6 +119,8 @@ type shardCfg struct {
|
||||||
gcCfg struct {
|
gcCfg struct {
|
||||||
removerBatchSize int
|
removerBatchSize int
|
||||||
removerSleepInterval time.Duration
|
removerSleepInterval time.Duration
|
||||||
|
expiredCollectorBatchSize int
|
||||||
|
expiredCollectorWorkersCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
writecacheCfg struct {
|
writecacheCfg struct {
|
||||||
|
@ -287,6 +289,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
|
|
||||||
sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
||||||
sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
||||||
|
sh.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
||||||
|
sh.gcCfg.expiredCollectorWorkersCount = gcCfg.ExpiredCollectorWorkersCount()
|
||||||
|
|
||||||
a.EngineCfg.shards = append(a.EngineCfg.shards, sh)
|
a.EngineCfg.shards = append(a.EngineCfg.shards, sh)
|
||||||
|
|
||||||
|
@ -753,6 +757,8 @@ func (c *cfg) shardOpts() []shardOptsWithID {
|
||||||
shard.WithWriteCacheOptions(writeCacheOpts...),
|
shard.WithWriteCacheOptions(writeCacheOpts...),
|
||||||
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
||||||
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
||||||
|
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
||||||
|
shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
|
||||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||||
pool, err := ants.NewPool(sz)
|
pool, err := ants.NewPool(sz)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||||
|
gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
|
||||||
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
|
piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
|
||||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -103,6 +104,8 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.EqualValues(t, 150, gc.RemoverBatchSize())
|
require.EqualValues(t, 150, gc.RemoverBatchSize())
|
||||||
require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
|
require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
|
||||||
|
require.Equal(t, 1500, gc.ExpiredCollectorBatchSize())
|
||||||
|
require.Equal(t, 15, gc.ExpiredCollectorWorkersCount())
|
||||||
|
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
|
@ -149,6 +152,8 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.EqualValues(t, 200, gc.RemoverBatchSize())
|
require.EqualValues(t, 200, gc.RemoverBatchSize())
|
||||||
require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
|
require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
|
||||||
|
require.Equal(t, gcconfig.ExpiredCollectorBatchSizeDefault, gc.ExpiredCollectorBatchSize())
|
||||||
|
require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkersCount())
|
||||||
|
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
|
|
|
@ -16,6 +16,12 @@ const (
|
||||||
|
|
||||||
// RemoverSleepIntervalDefault is a default sleep interval of Shard GC's remover.
|
// RemoverSleepIntervalDefault is a default sleep interval of Shard GC's remover.
|
||||||
RemoverSleepIntervalDefault = time.Minute
|
RemoverSleepIntervalDefault = time.Minute
|
||||||
|
|
||||||
|
// ExpiredCollectorWorkersCountDefault is a default workers count of Shard GC expired object collector.
|
||||||
|
ExpiredCollectorWorkersCountDefault = 5
|
||||||
|
|
||||||
|
// ExpiredCollectorBatchSizeDefault is a default batch size of Shard GC expired object collector.
|
||||||
|
ExpiredCollectorBatchSizeDefault = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
|
@ -56,3 +62,37 @@ func (x *Config) RemoverSleepInterval() time.Duration {
|
||||||
|
|
||||||
return RemoverSleepIntervalDefault
|
return RemoverSleepIntervalDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExpiredCollectorWorkersCount returns the value of "expired_collector_workers_count"
|
||||||
|
// config parameter.
|
||||||
|
//
|
||||||
|
// Returns ExpiredCollectorWorkersCountDefault if the value is not a positive number.
|
||||||
|
func (x *Config) ExpiredCollectorWorkersCount() int {
|
||||||
|
s := config.IntSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"expired_collector_workers_count",
|
||||||
|
)
|
||||||
|
|
||||||
|
if s > 0 {
|
||||||
|
return int(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ExpiredCollectorWorkersCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExpiredCollectorBatchSize returns the value of "expired_collector_batch_size"
|
||||||
|
// config parameter.
|
||||||
|
//
|
||||||
|
// Returns ExpiredCollectorBatchSizeDefault if the value is not a positive number.
|
||||||
|
func (x *Config) ExpiredCollectorBatchSize() int {
|
||||||
|
s := config.IntSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"expired_collector_batch_size",
|
||||||
|
)
|
||||||
|
|
||||||
|
if s > 0 {
|
||||||
|
return int(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ExpiredCollectorBatchSizeDefault
|
||||||
|
}
|
||||||
|
|
|
@ -135,6 +135,10 @@ FROSTFS_STORAGE_SHARD_0_PILORAMA_MAX_BATCH_SIZE=200
|
||||||
FROSTFS_STORAGE_SHARD_0_GC_REMOVER_BATCH_SIZE=150
|
FROSTFS_STORAGE_SHARD_0_GC_REMOVER_BATCH_SIZE=150
|
||||||
#### Sleep interval between data remover tacts
|
#### Sleep interval between data remover tacts
|
||||||
FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
|
FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
|
||||||
|
#### Limit of objects to be marked expired by the garbage collector
|
||||||
|
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
|
||||||
|
#### Limit of concurrent workers collecting expired objects by the garbage collector
|
||||||
|
FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKERS_COUNT=15
|
||||||
|
|
||||||
## 1 shard
|
## 1 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
|
|
|
@ -187,7 +187,9 @@
|
||||||
},
|
},
|
||||||
"gc": {
|
"gc": {
|
||||||
"remover_batch_size": 150,
|
"remover_batch_size": 150,
|
||||||
"remover_sleep_interval": "2m"
|
"remover_sleep_interval": "2m",
|
||||||
|
"expired_collector_batch_size": 1500,
|
||||||
|
"expired_collector_workers_count": 15
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"1": {
|
"1": {
|
||||||
|
|
|
@ -192,6 +192,8 @@ storage:
|
||||||
gc:
|
gc:
|
||||||
remover_batch_size: 150 # number of objects to be removed by the garbage collector
|
remover_batch_size: 150 # number of objects to be removed by the garbage collector
|
||||||
remover_sleep_interval: 2m # frequency of the garbage collector invocation
|
remover_sleep_interval: 2m # frequency of the garbage collector invocation
|
||||||
|
expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
|
||||||
|
expired_collector_workers_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
|
||||||
|
|
||||||
1:
|
1:
|
||||||
writecache:
|
writecache:
|
||||||
|
|
5
go.mod
5
go.mod
|
@ -33,14 +33,14 @@ require (
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
go.uber.org/atomic v1.10.0
|
go.uber.org/atomic v1.10.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
|
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
|
||||||
|
golang.org/x/sync v0.1.0
|
||||||
golang.org/x/term v0.3.0
|
golang.org/x/term v0.3.0
|
||||||
google.golang.org/grpc v1.51.0
|
google.golang.org/grpc v1.51.0
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.28.1
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||||
|
@ -94,7 +94,6 @@ require (
|
||||||
go.uber.org/multierr v1.9.0 // indirect
|
go.uber.org/multierr v1.9.0 // indirect
|
||||||
golang.org/x/crypto v0.4.0 // indirect
|
golang.org/x/crypto v0.4.0 // indirect
|
||||||
golang.org/x/net v0.4.0 // indirect
|
golang.org/x/net v0.4.0 // indirect
|
||||||
golang.org/x/sync v0.1.0 // indirect
|
|
||||||
golang.org/x/sys v0.3.0 // indirect
|
golang.org/x/sys v0.3.0 // indirect
|
||||||
golang.org/x/text v0.5.0 // indirect
|
golang.org/x/text v0.5.0 // indirect
|
||||||
golang.org/x/time v0.1.0 // indirect
|
golang.org/x/time v0.1.0 // indirect
|
||||||
|
|
|
@ -143,9 +143,8 @@ func (s *Shard) Init() error {
|
||||||
eventNewEpoch: {
|
eventNewEpoch: {
|
||||||
cancelFunc: func() {},
|
cancelFunc: func() {},
|
||||||
handlers: []eventHandler{
|
handlers: []eventHandler{
|
||||||
s.collectExpiredObjects,
|
s.collectExpiredLocksAndObjects,
|
||||||
s.collectExpiredTombstones,
|
s.collectExpiredTombstones,
|
||||||
s.collectExpiredLocks,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -12,6 +12,12 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minExpiredWorkers = 2
|
||||||
|
minExpiredBatchSize = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// TombstoneSource is an interface that checks
|
// TombstoneSource is an interface that checks
|
||||||
|
@ -81,6 +87,9 @@ type gcCfg struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
workerPoolInit func(int) util.WorkerPool
|
workerPoolInit func(int) util.WorkerPool
|
||||||
|
|
||||||
|
expiredCollectorWorkersCount int
|
||||||
|
expiredCollectorBatchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultGCCfg() gcCfg {
|
func defaultGCCfg() gcCfg {
|
||||||
|
@ -234,17 +243,74 @@ func (s *Shard) removeGarbage() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocksAndObjects(ctx context.Context, e Event) {
|
||||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
|
s.collectExpiredLocks(ctx, e)
|
||||||
return typ != object.TypeTombstone && typ != object.TypeLock
|
s.collectExpiredObjects(ctx, e)
|
||||||
})
|
}
|
||||||
if err != nil || len(expired) == 0 {
|
|
||||||
if err != nil {
|
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
|
||||||
s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
|
workersCount = minExpiredWorkers
|
||||||
|
batchSize = minExpiredBatchSize
|
||||||
|
|
||||||
|
if s.gc.gcCfg.expiredCollectorBatchSize > batchSize {
|
||||||
|
batchSize = s.gc.gcCfg.expiredCollectorBatchSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.gc.gcCfg.expiredCollectorWorkersCount > workersCount {
|
||||||
|
workersCount = s.gc.gcCfg.expiredCollectorWorkersCount
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
|
|
||||||
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
errGroup.SetLimit(workersCount)
|
||||||
|
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
batch := make([]oid.Address, 0, batchSize)
|
||||||
|
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
||||||
|
if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock {
|
||||||
|
batch = append(batch, o.Address())
|
||||||
|
|
||||||
|
if len(batch) == batchSize {
|
||||||
|
expired := batch
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
s.handleExpiredObjects(egCtx, expired)
|
||||||
|
return egCtx.Err()
|
||||||
|
})
|
||||||
|
batch = make([]oid.Address, 0, batchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
expired := batch
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
s.handleExpiredObjects(egCtx, expired)
|
||||||
|
return egCtx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
@ -343,44 +409,69 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
|
||||||
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
|
workersCount, batchSize := s.getExpiredObjectsParameters()
|
||||||
return typ == object.TypeLock
|
|
||||||
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
|
errGroup.SetLimit(workersCount)
|
||||||
|
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
batch := make([]oid.Address, 0, batchSize)
|
||||||
|
|
||||||
|
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
|
||||||
|
if o.Type() == object.TypeLock {
|
||||||
|
batch = append(batch, o.Address())
|
||||||
|
|
||||||
|
if len(batch) == batchSize {
|
||||||
|
expired := batch
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
s.expiredLocksCallback(egCtx, expired)
|
||||||
|
return egCtx.Err()
|
||||||
|
})
|
||||||
|
batch = make([]oid.Address, 0, batchSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
if err != nil || len(expired) == 0 {
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
expired := batch
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
s.expiredLocksCallback(egCtx, expired)
|
||||||
|
return egCtx.Err()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
|
s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s.expiredLocksCallback(ctx, expired)
|
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
|
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
if s.info.Mode.NoMetabase() {
|
if s.info.Mode.NoMetabase() {
|
||||||
return nil, ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var expired []oid.Address
|
|
||||||
|
|
||||||
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
|
err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return meta.ErrInterruptIterator
|
return meta.ErrInterruptIterator
|
||||||
default:
|
default:
|
||||||
if typeCond(expiredObject.Type()) {
|
onExpiredFound(expiredObject)
|
||||||
expired = append(expired, expiredObject.Address())
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
return expired, ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleExpiredTombstones marks tombstones themselves as garbage
|
// HandleExpiredTombstones marks tombstones themselves as garbage
|
||||||
|
|
|
@ -305,6 +305,22 @@ func WithReportErrorFunc(f func(selfID string, message string, err error)) Optio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithExpiredCollectorBatchSize returns option to set batch size
|
||||||
|
// of expired object collection operation.
|
||||||
|
func WithExpiredCollectorBatchSize(size int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gcCfg.expiredCollectorBatchSize = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithExpiredCollectorWorkersCount returns option to set concurrent
|
||||||
|
// workers count of expired object collection operation.
|
||||||
|
func WithExpiredCollectorWorkersCount(count int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gcCfg.expiredCollectorWorkersCount = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) fillInfo() {
|
func (s *Shard) fillInfo() {
|
||||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||||
|
|
Loading…
Reference in a new issue