2022-07-22 08:53:05 +00:00
|
|
|
package pilorama
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2023-03-07 13:38:26 +00:00
|
|
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
2022-07-22 08:53:05 +00:00
|
|
|
"go.etcd.io/bbolt"
|
|
|
|
)
|
|
|
|
|
|
|
|
type batch struct {
|
2023-02-01 08:58:16 +00:00
|
|
|
forest *boltForest
|
|
|
|
timer *time.Timer
|
|
|
|
// mtx protects timer and operations fields.
|
|
|
|
// Because mtx can be taken inside a transaction,
|
|
|
|
// transactions MUST NOT be executed with the mutex taken to avoid a deadlock.
|
2022-07-22 08:53:05 +00:00
|
|
|
mtx sync.Mutex
|
|
|
|
start sync.Once
|
|
|
|
cid cidSDK.ID
|
|
|
|
treeID string
|
|
|
|
results []chan<- error
|
|
|
|
operations []*Move
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *batch) trigger() {
|
|
|
|
b.mtx.Lock()
|
|
|
|
if b.timer != nil {
|
|
|
|
b.timer.Stop()
|
|
|
|
}
|
|
|
|
b.mtx.Unlock()
|
|
|
|
b.start.Do(b.run)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *batch) run() {
|
2023-01-18 06:38:42 +00:00
|
|
|
fullID := bucketName(b.cid, b.treeID)
|
2022-07-22 08:53:05 +00:00
|
|
|
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
2023-01-18 06:38:42 +00:00
|
|
|
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
|
2022-07-22 08:53:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-02-01 08:58:16 +00:00
|
|
|
b.mtx.Lock()
|
|
|
|
b.timer = nil
|
|
|
|
b.mtx.Unlock()
|
|
|
|
|
|
|
|
// Sorting without a mutex is ok, because we append to this slice only if timer is non-nil.
|
|
|
|
// See (*boltForest).addBatch for details.
|
|
|
|
sort.Slice(b.operations, func(i, j int) bool {
|
|
|
|
return b.operations[i].Time < b.operations[j].Time
|
|
|
|
})
|
|
|
|
|
2023-05-25 15:40:27 +00:00
|
|
|
b.operations = removeDuplicatesInPlace(b.operations)
|
2023-01-25 11:12:02 +00:00
|
|
|
var lm Move
|
2022-07-22 08:53:05 +00:00
|
|
|
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
|
|
|
})
|
2023-05-25 15:40:27 +00:00
|
|
|
for i := range b.results {
|
2022-07-22 08:53:05 +00:00
|
|
|
b.results[i] <- err
|
|
|
|
}
|
|
|
|
}
|
2023-05-25 15:40:27 +00:00
|
|
|
|
|
|
|
func removeDuplicatesInPlace(a []*Move) []*Move {
|
|
|
|
equalCount := 0
|
|
|
|
for i := 1; i < len(a); i++ {
|
|
|
|
if a[i].Time == a[i-1].Time {
|
|
|
|
equalCount++
|
|
|
|
} else {
|
|
|
|
a[i-equalCount] = a[i]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return a[:len(a)-equalCount]
|
|
|
|
}
|