From 5059dcc19dfd34d6b1f6e53a7d7efafb2c3c4b0e Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Fri, 17 Mar 2023 11:06:15 +0300
Subject: [PATCH] [#145] shard-gc: Delete expired objects after locks

GC deletes expired locks and objects sequentially. Expired locks and
objects are now being deleted concurrently in batches. Added a config
parameter that controls the number of concurrent workers and batch size.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 cmd/frostfs-node/config.go                    |  10 +-
 cmd/frostfs-node/config/engine/config_test.go |   5 +
 .../config/engine/shard/gc/config.go          |  40 +++++
 config/example/node.env                       |   4 +
 config/example/node.json                      |   4 +-
 config/example/node.yaml                      |   2 +
 go.mod                                        |   5 +-
 pkg/local_object_storage/shard/control.go     |   3 +-
 pkg/local_object_storage/shard/gc.go          | 139 +++++++++++++++---
 pkg/local_object_storage/shard/shard.go       |  16 ++
 10 files changed, 196 insertions(+), 32 deletions(-)

diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go
index 25222ed44..09f2c2087 100644
--- a/cmd/frostfs-node/config.go
+++ b/cmd/frostfs-node/config.go
@@ -117,8 +117,10 @@ type shardCfg struct {
 	subStorages []subStorageCfg
 
 	gcCfg struct {
-		removerBatchSize     int
-		removerSleepInterval time.Duration
+		removerBatchSize             int
+		removerSleepInterval         time.Duration
+		expiredCollectorBatchSize    int
+		expiredCollectorWorkersCount int
 	}
 
 	writecacheCfg struct {
@@ -287,6 +289,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
 
 		sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
 		sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
+		sh.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
+		sh.gcCfg.expiredCollectorWorkersCount = gcCfg.ExpiredCollectorWorkersCount()
 
 		a.EngineCfg.shards = append(a.EngineCfg.shards, sh)
 
@@ -753,6 +757,8 @@ func (c *cfg) shardOpts() []shardOptsWithID {
 			shard.WithWriteCacheOptions(writeCacheOpts...),
 			shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
 			shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
+			shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
+			shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
 			shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
 				pool, err := ants.NewPool(sz)
 				fatalOnErr(err)
diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go
index 72e57ee5a..83f5a65ed 100644
--- a/cmd/frostfs-node/config/engine/config_test.go
+++ b/cmd/frostfs-node/config/engine/config_test.go
@@ -10,6 +10,7 @@ import (
 	shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
 	blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
 	fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
+	gcconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/gc"
 	piloramaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/pilorama"
 	configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@@ -103,6 +104,8 @@ func TestEngineSection(t *testing.T) {
 
 				require.EqualValues(t, 150, gc.RemoverBatchSize())
 				require.Equal(t, 2*time.Minute, gc.RemoverSleepInterval())
+				require.Equal(t, 1500, gc.ExpiredCollectorBatchSize())
+				require.Equal(t, 15, gc.ExpiredCollectorWorkersCount())
 
 				require.Equal(t, false, sc.RefillMetabase())
 				require.Equal(t, mode.ReadOnly, sc.Mode())
@@ -149,6 +152,8 @@ func TestEngineSection(t *testing.T) {
 
 				require.EqualValues(t, 200, gc.RemoverBatchSize())
 				require.Equal(t, 5*time.Minute, gc.RemoverSleepInterval())
+				require.Equal(t, gcconfig.ExpiredCollectorBatchSizeDefault, gc.ExpiredCollectorBatchSize())
+				require.Equal(t, gcconfig.ExpiredCollectorWorkersCountDefault, gc.ExpiredCollectorWorkersCount())
 
 				require.Equal(t, true, sc.RefillMetabase())
 				require.Equal(t, mode.ReadWrite, sc.Mode())
diff --git a/cmd/frostfs-node/config/engine/shard/gc/config.go b/cmd/frostfs-node/config/engine/shard/gc/config.go
index 1aa01d906..0500697c8 100644
--- a/cmd/frostfs-node/config/engine/shard/gc/config.go
+++ b/cmd/frostfs-node/config/engine/shard/gc/config.go
@@ -16,6 +16,12 @@ const (
 
 	// RemoverSleepIntervalDefault is a default sleep interval of Shard GC's remover.
 	RemoverSleepIntervalDefault = time.Minute
+
+	// ExpiredCollectorWorkersCountDefault is a default workers count of Shard GC expired object collector.
+	ExpiredCollectorWorkersCountDefault = 5
+
+	// ExpiredCollectorBatchSizeDefault is a default batch size of Shard GC expired object collector.
+	ExpiredCollectorBatchSizeDefault = 500
 )
 
 // From wraps config section into Config.
@@ -56,3 +62,37 @@ func (x *Config) RemoverSleepInterval() time.Duration {
 
 	return RemoverSleepIntervalDefault
 }
+
+// ExpiredCollectorWorkersCount returns the value of "expired_collector_workers_count"
+// config parameter.
+//
+// Returns ExpiredCollectorWorkersCountDefault if the value is not a positive number.
+func (x *Config) ExpiredCollectorWorkersCount() int {
+	s := config.IntSafe(
+		(*config.Config)(x),
+		"expired_collector_workers_count",
+	)
+
+	if s > 0 {
+		return int(s)
+	}
+
+	return ExpiredCollectorWorkersCountDefault
+}
+
+// ExpiredCollectorBatchSize returns the value of "expired_collector_batch_size"
+// config parameter.
+//
+// Returns ExpiredCollectorBatchSizeDefault if the value is not a positive number.
+func (x *Config) ExpiredCollectorBatchSize() int {
+	s := config.IntSafe(
+		(*config.Config)(x),
+		"expired_collector_batch_size",
+	)
+
+	if s > 0 {
+		return int(s)
+	}
+
+	return ExpiredCollectorBatchSizeDefault
+}
diff --git a/config/example/node.env b/config/example/node.env
index 247178d60..8034fbb23 100644
--- a/config/example/node.env
+++ b/config/example/node.env
@@ -135,6 +135,10 @@ FROSTFS_STORAGE_SHARD_0_PILORAMA_MAX_BATCH_SIZE=200
 FROSTFS_STORAGE_SHARD_0_GC_REMOVER_BATCH_SIZE=150
 #### Sleep interval between data remover tacts
 FROSTFS_STORAGE_SHARD_0_GC_REMOVER_SLEEP_INTERVAL=2m
+#### Limit of objects to be marked expired by the garbage collector
+FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_BATCH_SIZE=1500
+#### Limit of concurrent workers collecting expired objects by the garbage collector
+FROSTFS_STORAGE_SHARD_0_GC_EXPIRED_COLLECTOR_WORKERS_COUNT=15
 
 ## 1 shard
 ### Flag to refill Metabase from BlobStor
diff --git a/config/example/node.json b/config/example/node.json
index 9fbd0f9bd..e7bb375a5 100644
--- a/config/example/node.json
+++ b/config/example/node.json
@@ -187,7 +187,9 @@
         },
         "gc": {
           "remover_batch_size": 150,
-          "remover_sleep_interval": "2m"
+          "remover_sleep_interval": "2m",
+          "expired_collector_batch_size": 1500,
+          "expired_collector_workers_count": 15
         }
       },
       "1": {
diff --git a/config/example/node.yaml b/config/example/node.yaml
index bf1d65451..6a5ea5f03 100644
--- a/config/example/node.yaml
+++ b/config/example/node.yaml
@@ -192,6 +192,8 @@ storage:
       gc:
         remover_batch_size: 150  # number of objects to be removed by the garbage collector
         remover_sleep_interval: 2m  # frequency of the garbage collector invocation
+        expired_collector_batch_size: 1500 # number of objects to be marked expired by the garbage collector
+        expired_collector_workers_count: 15 # number of concurrent workers collecting expired objects by the garbage collector
 
     1:
       writecache:
diff --git a/go.mod b/go.mod
index 61cd57647..bb5a53b22 100644
--- a/go.mod
+++ b/go.mod
@@ -33,14 +33,14 @@ require (
 	go.etcd.io/bbolt v1.3.6
 	go.uber.org/atomic v1.10.0
 	go.uber.org/zap v1.24.0
+	golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
+	golang.org/x/sync v0.1.0
 	golang.org/x/term v0.3.0
 	google.golang.org/grpc v1.51.0
 	google.golang.org/protobuf v1.28.1
 	gopkg.in/yaml.v3 v3.0.1
 )
 
-require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
-
 require (
 	git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
 	git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
@@ -94,7 +94,6 @@ require (
 	go.uber.org/multierr v1.9.0 // indirect
 	golang.org/x/crypto v0.4.0 // indirect
 	golang.org/x/net v0.4.0 // indirect
-	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.3.0 // indirect
 	golang.org/x/text v0.5.0 // indirect
 	golang.org/x/time v0.1.0 // indirect
diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go
index 05f58f830..acf038d71 100644
--- a/pkg/local_object_storage/shard/control.go
+++ b/pkg/local_object_storage/shard/control.go
@@ -143,9 +143,8 @@ func (s *Shard) Init() error {
 			eventNewEpoch: {
 				cancelFunc: func() {},
 				handlers: []eventHandler{
-					s.collectExpiredObjects,
+					s.collectExpiredLocksAndObjects,
 					s.collectExpiredTombstones,
-					s.collectExpiredLocks,
 				},
 			},
 		},
diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go
index a8910561e..b5874711a 100644
--- a/pkg/local_object_storage/shard/gc.go
+++ b/pkg/local_object_storage/shard/gc.go
@@ -12,6 +12,12 @@ import (
 	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
 	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
 	"go.uber.org/zap"
+	"golang.org/x/sync/errgroup"
+)
+
+const (
+	minExpiredWorkers   = 2
+	minExpiredBatchSize = 1
 )
 
 // TombstoneSource is an interface that checks
@@ -81,6 +87,9 @@ type gcCfg struct {
 	log *logger.Logger
 
 	workerPoolInit func(int) util.WorkerPool
+
+	expiredCollectorWorkersCount int
+	expiredCollectorBatchSize    int
 }
 
 func defaultGCCfg() gcCfg {
@@ -234,15 +243,72 @@ func (s *Shard) removeGarbage() {
 	}
 }
 
+func (s *Shard) collectExpiredLocksAndObjects(ctx context.Context, e Event) {
+	s.collectExpiredLocks(ctx, e)
+	s.collectExpiredObjects(ctx, e)
+}
+
+func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
+	workersCount = minExpiredWorkers
+	batchSize = minExpiredBatchSize
+
+	if s.gc.gcCfg.expiredCollectorBatchSize > batchSize {
+		batchSize = s.gc.gcCfg.expiredCollectorBatchSize
+	}
+
+	if s.gc.gcCfg.expiredCollectorWorkersCount > workersCount {
+		workersCount = s.gc.gcCfg.expiredCollectorWorkersCount
+	}
+	return
+}
+
 func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
-	expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
-		return typ != object.TypeTombstone && typ != object.TypeLock
-	})
-	if err != nil || len(expired) == 0 {
+	workersCount, batchSize := s.getExpiredObjectsParameters()
+
+	errGroup, egCtx := errgroup.WithContext(ctx)
+	errGroup.SetLimit(workersCount)
+
+	errGroup.Go(func() error {
+		batch := make([]oid.Address, 0, batchSize)
+		err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
+			if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock {
+				batch = append(batch, o.Address())
+
+				if len(batch) == batchSize {
+					expired := batch
+					errGroup.Go(func() error {
+						s.handleExpiredObjects(egCtx, expired)
+						return egCtx.Err()
+					})
+					batch = make([]oid.Address, 0, batchSize)
+				}
+			}
+		})
 		if err != nil {
-			s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
+			return err
 		}
+
+		if len(batch) > 0 {
+			expired := batch
+			errGroup.Go(func() error {
+				s.handleExpiredObjects(egCtx, expired)
+				return egCtx.Err()
+			})
+		}
+
+		return nil
+	})
+
+	if err := errGroup.Wait(); err != nil {
+		s.log.Warn("iterator over expired objects failed", zap.String("error", err.Error()))
+	}
+}
+
+func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) {
+	select {
+	case <-ctx.Done():
 		return
+	default:
 	}
 
 	s.m.RLock()
@@ -343,44 +409,69 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
 }
 
 func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
-	expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
-		return typ == object.TypeLock
-	})
-	if err != nil || len(expired) == 0 {
-		if err != nil {
-			s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
-		}
-		return
-	}
+	workersCount, batchSize := s.getExpiredObjectsParameters()
 
-	s.expiredLocksCallback(ctx, expired)
+	errGroup, egCtx := errgroup.WithContext(ctx)
+	errGroup.SetLimit(workersCount)
+
+	errGroup.Go(func() error {
+		batch := make([]oid.Address, 0, batchSize)
+
+		err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
+			if o.Type() == object.TypeLock {
+				batch = append(batch, o.Address())
+
+				if len(batch) == batchSize {
+					expired := batch
+					errGroup.Go(func() error {
+						s.expiredLocksCallback(egCtx, expired)
+						return egCtx.Err()
+					})
+					batch = make([]oid.Address, 0, batchSize)
+				}
+			}
+		})
+		if err != nil {
+			return err
+		}
+
+		if len(batch) > 0 {
+			expired := batch
+			errGroup.Go(func() error {
+				s.expiredLocksCallback(egCtx, expired)
+				return egCtx.Err()
+			})
+		}
+
+		return nil
+	})
+
+	if err := errGroup.Wait(); err != nil {
+		s.log.Warn("iterator over expired locks failed", zap.String("error", err.Error()))
+	}
 }
 
-func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
+func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error {
 	s.m.RLock()
 	defer s.m.RUnlock()
 
 	if s.info.Mode.NoMetabase() {
-		return nil, ErrDegradedMode
+		return ErrDegradedMode
 	}
 
-	var expired []oid.Address
-
 	err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
 		select {
 		case <-ctx.Done():
 			return meta.ErrInterruptIterator
 		default:
-			if typeCond(expiredObject.Type()) {
-				expired = append(expired, expiredObject.Address())
-			}
+			onExpiredFound(expiredObject)
 			return nil
 		}
 	})
 	if err != nil {
-		return nil, err
+		return err
 	}
-	return expired, ctx.Err()
+	return ctx.Err()
 }
 
 // HandleExpiredTombstones marks tombstones themselves as garbage
diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go
index a0fd077c7..dd74dad35 100644
--- a/pkg/local_object_storage/shard/shard.go
+++ b/pkg/local_object_storage/shard/shard.go
@@ -305,6 +305,22 @@ func WithReportErrorFunc(f func(selfID string, message string, err error)) Optio
 	}
 }
 
+// WithExpiredCollectorBatchSize returns option to set batch size
+// of expired object collection operation.
+func WithExpiredCollectorBatchSize(size int) Option {
+	return func(c *cfg) {
+		c.gcCfg.expiredCollectorBatchSize = size
+	}
+}
+
+// WithExpiredCollectorWorkersCount returns option to set concurrent
+// workers count of expired object collection operation.
+func WithExpiredCollectorWorkersCount(count int) Option {
+	return func(c *cfg) {
+		c.gcCfg.expiredCollectorWorkersCount = count
+	}
+}
+
 func (s *Shard) fillInfo() {
 	s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
 	s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()