[#1449] tree: Add ApplyBatch method
Concurrent Apply can lead to child node applies before parent, so undo/redo operations will perform. This leads to performance degradation in case of tree with many sublevels. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
16830033f8
commit
9bd05e94c8
15 changed files with 180 additions and 21 deletions
|
@ -10,6 +10,8 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subsection = "tree"
|
subsection = "tree"
|
||||||
|
|
||||||
|
SyncBatchSizeDefault = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
// TreeConfig is a wrapper over "tree" config section
|
// TreeConfig is a wrapper over "tree" config section
|
||||||
|
@ -74,6 +76,17 @@ func (c TreeConfig) SyncInterval() time.Duration {
|
||||||
return config.DurationSafe(c.cfg, "sync_interval")
|
return config.DurationSafe(c.cfg, "sync_interval")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncBatchSize returns the value of "sync_batch_size"
|
||||||
|
// config parameter from the "tree" section.
|
||||||
|
//
|
||||||
|
// Returns `SyncBatchSizeDefault` if config value is not specified.
|
||||||
|
func (c TreeConfig) SyncBatchSize() int {
|
||||||
|
if v := config.IntSafe(c.cfg, "sync_batch_size"); v > 0 {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
return SyncBatchSizeDefault
|
||||||
|
}
|
||||||
|
|
||||||
// AuthorizedKeys parses and returns an array of "authorized_keys" config
|
// AuthorizedKeys parses and returns an array of "authorized_keys" config
|
||||||
// parameter from "tree" section.
|
// parameter from "tree" section.
|
||||||
//
|
//
|
||||||
|
|
|
@ -44,6 +44,7 @@ func TestTreeSection(t *testing.T) {
|
||||||
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
|
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
|
||||||
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
|
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
|
||||||
require.Equal(t, time.Hour, treeSec.SyncInterval())
|
require.Equal(t, time.Hour, treeSec.SyncInterval())
|
||||||
|
require.Equal(t, 2000, treeSec.SyncBatchSize())
|
||||||
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
|
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ func initTreeService(c *cfg) {
|
||||||
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
||||||
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
||||||
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
||||||
|
tree.WithSyncBatchSize(treeConfig.SyncBatchSize()),
|
||||||
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
||||||
tree.WithMetrics(c.metricsCollector.TreeService()),
|
tree.WithMetrics(c.metricsCollector.TreeService()),
|
||||||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||||
|
|
|
@ -31,6 +31,7 @@ FROSTFS_TREE_REPLICATION_CHANNEL_CAPACITY=32
|
||||||
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
|
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
|
||||||
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
|
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
|
||||||
FROSTFS_TREE_SYNC_INTERVAL=1h
|
FROSTFS_TREE_SYNC_INTERVAL=1h
|
||||||
|
FROSTFS_TREE_SYNC_BATCH_SIZE=2000
|
||||||
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||||
|
|
||||||
# gRPC section
|
# gRPC section
|
||||||
|
|
|
@ -69,6 +69,7 @@
|
||||||
"replication_worker_count": 32,
|
"replication_worker_count": 32,
|
||||||
"replication_timeout": "5s",
|
"replication_timeout": "5s",
|
||||||
"sync_interval": "1h",
|
"sync_interval": "1h",
|
||||||
|
"sync_batch_size": 2000,
|
||||||
"authorized_keys": [
|
"authorized_keys": [
|
||||||
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
|
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
|
||||||
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||||
|
|
|
@ -59,6 +59,7 @@ tree:
|
||||||
replication_channel_capacity: 32
|
replication_channel_capacity: 32
|
||||||
replication_timeout: 5s
|
replication_timeout: 5s
|
||||||
sync_interval: 1h
|
sync_interval: 1h
|
||||||
|
sync_batch_size: 2000
|
||||||
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
|
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
|
||||||
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
|
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
|
||||||
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
|
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
|
||||||
|
|
|
@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||||
|
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
|
||||||
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||||
|
e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err,
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.String("tree", treeID),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
||||||
|
|
|
@ -558,6 +558,80 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
m, err := t.filterSeen(cnr, treeID, m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(m) == 0 {
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan error)
|
||||||
|
b := &batch{
|
||||||
|
forest: t,
|
||||||
|
cid: cnr,
|
||||||
|
treeID: treeID,
|
||||||
|
results: []chan<- error{ch},
|
||||||
|
operations: m,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
b.run()
|
||||||
|
}()
|
||||||
|
err = <-ch
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
|
||||||
|
t.modeMtx.RLock()
|
||||||
|
defer t.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if t.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
ops := make([]*Move, 0, len(m))
|
||||||
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||||
|
if treeRoot == nil {
|
||||||
|
ops = m
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b := treeRoot.Bucket(logBucket)
|
||||||
|
for _, op := range m {
|
||||||
|
var logKey [8]byte
|
||||||
|
binary.BigEndian.PutUint64(logKey[:], op.Time)
|
||||||
|
seen := b.Get(logKey[:]) != nil
|
||||||
|
if !seen {
|
||||||
|
ops = append(ops, op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
||||||
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -111,6 +111,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
|
||||||
return s.Apply(op)
|
return s.Apply(op)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
|
||||||
|
for _, op := range ops {
|
||||||
|
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *memoryForest) Init() error {
|
func (f *memoryForest) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ type Forest interface {
|
||||||
// TreeApply applies replicated operation from another node.
|
// TreeApply applies replicated operation from another node.
|
||||||
// If background is true, TreeApply will first check whether an operation exists.
|
// If background is true, TreeApply will first check whether an operation exists.
|
||||||
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||||
|
// TreeApplyBatch applies replicated operations from another node.
|
||||||
|
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
|
||||||
// TreeGetByPath returns all nodes corresponding to the path.
|
// TreeGetByPath returns all nodes corresponding to the path.
|
||||||
// The path is constructed by descending from the root using the values of the
|
// The path is constructed by descending from the root using the values of the
|
||||||
// AttributeFilename in meta.
|
// AttributeFilename in meta.
|
||||||
|
|
|
@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
|
||||||
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||||
|
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if s.pilorama == nil {
|
||||||
|
return ErrPiloramaDisabled
|
||||||
|
}
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
|
||||||
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
||||||
|
|
|
@ -41,6 +41,7 @@ type cfg struct {
|
||||||
replicatorTimeout time.Duration
|
replicatorTimeout time.Duration
|
||||||
containerCacheSize int
|
containerCacheSize int
|
||||||
authorizedKeys [][]byte
|
authorizedKeys [][]byte
|
||||||
|
syncBatchSize int
|
||||||
|
|
||||||
localOverrideStorage policyengine.LocalOverrideStorage
|
localOverrideStorage policyengine.LocalOverrideStorage
|
||||||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||||
|
@ -113,6 +114,12 @@ func WithReplicationWorkerCount(n int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithSyncBatchSize(n int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.syncBatchSize = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithContainerCacheSize(n int) Option {
|
func WithContainerCacheSize(n int) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
|
|
|
@ -40,6 +40,7 @@ const (
|
||||||
defaultReplicatorCapacity = 64
|
defaultReplicatorCapacity = 64
|
||||||
defaultReplicatorWorkerCount = 64
|
defaultReplicatorWorkerCount = 64
|
||||||
defaultReplicatorSendTimeout = time.Second * 5
|
defaultReplicatorSendTimeout = time.Second * 5
|
||||||
|
defaultSyncBatchSize = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Service) localReplicationWorker(ctx context.Context) {
|
func (s *Service) localReplicationWorker(ctx context.Context) {
|
||||||
|
|
|
@ -55,6 +55,7 @@ func New(opts ...Option) *Service {
|
||||||
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
||||||
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
||||||
s.replicatorTimeout = defaultReplicatorSendTimeout
|
s.replicatorTimeout = defaultReplicatorSendTimeout
|
||||||
|
s.syncBatchSize = defaultSyncBatchSize
|
||||||
s.metrics = defaultMetricsRegister{}
|
s.metrics = defaultMetricsRegister{}
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
|
|
|
@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
||||||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
operationStream <-chan *pilorama.Move,
|
operationStream <-chan *pilorama.Move,
|
||||||
) uint64 {
|
) uint64 {
|
||||||
errGroup, _ := errgroup.WithContext(ctx)
|
|
||||||
const workersCount = 1024
|
|
||||||
errGroup.SetLimit(workersCount)
|
|
||||||
|
|
||||||
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
|
||||||
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
|
||||||
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
|
||||||
// this height is stored in unappliedOperationHeight.
|
|
||||||
var unappliedOperationHeight uint64 = math.MaxUint64
|
|
||||||
var heightMtx sync.Mutex
|
|
||||||
|
|
||||||
var prev *pilorama.Move
|
var prev *pilorama.Move
|
||||||
|
var batch []*pilorama.Move
|
||||||
for m := range operationStream {
|
for m := range operationStream {
|
||||||
// skip already applied op
|
// skip already applied op
|
||||||
if prev != nil && prev.Time == m.Time {
|
if prev != nil && prev.Time == m.Time {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
prev = m
|
prev = m
|
||||||
|
batch = append(batch, m)
|
||||||
|
|
||||||
errGroup.Go(func() error {
|
if len(batch) == s.syncBatchSize {
|
||||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||||
heightMtx.Lock()
|
return batch[0].Time
|
||||||
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
|
|
||||||
heightMtx.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return nil
|
batch = batch[:0]
|
||||||
})
|
|
||||||
}
|
}
|
||||||
_ = errGroup.Wait()
|
}
|
||||||
return unappliedOperationHeight
|
if len(batch) > 0 {
|
||||||
|
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||||
|
return batch[0].Time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return math.MaxUint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
|
|
Loading…
Reference in a new issue