[#2232] pilorama: Merge in-queue batches
To achieve high performance we must choose proper values for both batch size and delay. For user operations we want to set low delay. However it would prevent tree synchronization operations to form big enough batches. For these operations, batching gives the most benefit not only in terms of on-CPU execution cost, but also by speeding up transaction persist (`fsync`). In this commit we try merging batches that are already _triggered_, but not yet _started to execute_. This way we can still query batches for execution after the provided delay while also allowing multiple formed batches to execute faster. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
236c4af615
commit
58367e4df6
3 changed files with 18 additions and 6 deletions
|
@ -15,6 +15,7 @@ Changelog for FrostFS Node
|
|||
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
|
||||
- Env prefix in configuration changed to `FROSTFS_*` (#43)
|
||||
- Link object is broadcast throughout the whole container now (#57)
|
||||
- Pilorama now can merge multiple batches into one (#2231)
|
||||
|
||||
### Fixed
|
||||
- Increase payload size metric on shards' `put` operation (#1794)
|
||||
|
|
|
@ -12,6 +12,9 @@ import (
|
|||
type batch struct {
|
||||
forest *boltForest
|
||||
timer *time.Timer
|
||||
// mtx protects timer and operations fields.
|
||||
// Because mtx can be taken inside a transaction,
|
||||
// transactions MUST NOT be executed with the mutex taken to avoid a deadlock.
|
||||
mtx sync.Mutex
|
||||
start sync.Once
|
||||
cid cidSDK.ID
|
||||
|
@ -24,16 +27,12 @@ func (b *batch) trigger() {
|
|||
b.mtx.Lock()
|
||||
if b.timer != nil {
|
||||
b.timer.Stop()
|
||||
b.timer = nil
|
||||
}
|
||||
b.mtx.Unlock()
|
||||
b.start.Do(b.run)
|
||||
}
|
||||
|
||||
func (b *batch) run() {
|
||||
sort.Slice(b.operations, func(i, j int) bool {
|
||||
return b.operations[i].Time < b.operations[j].Time
|
||||
})
|
||||
fullID := bucketName(b.cid, b.treeID)
|
||||
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
|
||||
|
@ -41,6 +40,16 @@ func (b *batch) run() {
|
|||
return err
|
||||
}
|
||||
|
||||
b.mtx.Lock()
|
||||
b.timer = nil
|
||||
b.mtx.Unlock()
|
||||
|
||||
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
|
||||
// See (*boltForest).addBatch for details.
|
||||
sort.Slice(b.operations, func(i, j int) bool {
|
||||
return b.operations[i].Time < b.operations[j].Time
|
||||
})
|
||||
|
||||
var lm Move
|
||||
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
||||
})
|
||||
|
|
|
@ -377,7 +377,9 @@ func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan e
|
|||
results: []chan<- error{ch},
|
||||
operations: []*Move{m},
|
||||
}
|
||||
b.mtx.Lock()
|
||||
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
|
||||
b.mtx.Unlock()
|
||||
t.batches = append(t.batches, b)
|
||||
t.mtx.Unlock()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue