[#2210] pilorama: Reduce the amount of keys per node

Under high load we are limited by the _amount_ of keys we need to update
in a single transaction. In this commit we try storing all state
with a single key.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2023-01-17 16:16:50 +03:00 committed by fyrchik
parent 64a5294b27
commit 165a600624
4 changed files with 126 additions and 122 deletions

View file

@ -156,8 +156,7 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove
return nil, ErrReadOnlyMode return nil, ErrReadOnlyMode
} }
var lm LogMove lm := *m
lm.Move = *m
return &lm, t.db.Batch(func(tx *bbolt.Tx) error { return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if err != nil { if err != nil {
@ -227,7 +226,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
ts := t.getLatestTimestamp(bLog, d.Position, d.Size) ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]LogMove, len(path)-i+1) lm = make([]LogMove, len(path)-i+1)
for j := i; j < len(path); j++ { for j := i; j < len(path); j++ {
lm[j-i].Move = Move{ lm[j-i] = Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: ts, Time: ts,
@ -245,7 +244,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
node = lm[j-i].Child node = lm[j-i].Child
} }
lm[len(lm)-1].Move = Move{ lm[len(lm)-1] = Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: ts, Time: ts,
@ -274,17 +273,14 @@ func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint6
// findSpareID returns random unused ID. // findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
id := uint64(rand.Int63()) id := uint64(rand.Int63())
key := make([]byte, 9)
var key [9]byte
key[0] = 't'
binary.LittleEndian.PutUint64(key[1:], id)
for { for {
if bTree.Get(key[:]) == nil { _, _, _, ok := t.getState(bTree, stateKey(key, id))
if !ok {
return id return id
} }
id = uint64(rand.Int63()) id = uint64(rand.Int63())
binary.LittleEndian.PutUint64(key[1:], id)
} }
} }
@ -420,8 +416,12 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
// 1. Undo up until the desired timestamp is here. // 1. Undo up until the desired timestamp is here.
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value) b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err tmp.Child = r.ReadU64LE()
tmp.Parent = r.ReadU64LE()
tmp.Time = r.ReadVarUint()
if r.Err != nil {
return r.Err
} }
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil { if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
return err return err
@ -433,7 +433,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
// Loop invariant: key represents the next stored timestamp after ms[i].Time. // Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation. // 2. Insert the operation.
lm.Move = *ms[i] *lm = *ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err return err
} }
@ -445,11 +445,10 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
// 3. Re-apply all other operations. // 3. Re-apply all other operations.
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) { for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
b.Reset(value) if err := t.logFromBytes(&tmp, value); err != nil {
if err := t.logFromBytes(&tmp, r); err != nil {
return err return err
} }
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
return err return err
} }
key, value = c.Next() key, value = c.Next()
@ -460,39 +459,42 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
} }
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error { func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
currParent := b.Get(parentKey(key, op.Child))
op.HasOld = currParent != nil
if currParent != nil { // node is already in tree
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
return err
}
} else {
op.HasOld = false
op.Old = nodeInfo{}
}
binary.BigEndian.PutUint64(key, op.Time) binary.BigEndian.PutUint64(key, op.Time)
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil { rawLog := t.logToBytes(op)
if err := lb.Put(key[:8], rawLog); err != nil {
return err return err
} }
if op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) { return t.redo(b, key, op, rawLog[16:])
return nil }
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *LogMove, rawMeta []byte) error {
var err error
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
if inTree {
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
} else {
ts = op.Time
err = b.Delete(oldKey(key, op.Time))
} }
if currParent == nil { if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil { return err
return err }
}
} else { if inTree {
parent := binary.LittleEndian.Uint64(currParent)
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil { if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
return err return err
} }
for i := range op.Old.Meta.Items {
if isAttributeInternal(op.Old.Meta.Items[i].Key) { var meta Meta
key = internalKey(key, op.Old.Meta.Items[i].Key, string(op.Old.Meta.Items[i].Value), parent, op.Child) if err := meta.FromBytes(currMeta); err != nil {
return err
}
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
err := b.Delete(key) err := b.Delete(key)
if err != nil { if err != nil {
return err return err
@ -500,17 +502,16 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo
} }
} }
} }
return t.addNode(b, key, op.Child, op.Parent, op.Meta) return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
} }
// removeNode removes node keys from the tree except the children key or its parent. // removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error { func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
if err := b.Delete(parentKey(key, node)); err != nil { k := stateKey(key, node)
return err _, _, rawMeta, _ := t.getState(b, k)
}
var meta Meta var meta Meta
var k = metaKey(key, node) if err := meta.FromBytes(rawMeta); err == nil {
if err := meta.FromBytes(b.Get(k)); err == nil {
for i := range meta.Items { for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) { if isAttributeInternal(meta.Items[i].Key) {
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node)) err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
@ -520,23 +521,16 @@ func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node)
} }
} }
} }
if err := b.Delete(metaKey(key, node)); err != nil { return b.Delete(k)
return err
}
return b.Delete(timestampKey(key, node))
} }
// addNode adds node keys to the tree except the timestamp key. // addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error { func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
err := b.Put(parentKey(key, child), toUint64(parent)) if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
if err != nil {
return err return err
} }
err = b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil { err := b.Put(childrenKey(key, child, parent), []byte{1})
return err
}
err = b.Put(metaKey(key, child), meta.Bytes())
if err != nil { if err != nil {
return err return err
} }
@ -564,22 +558,28 @@ func (t *boltForest) undo(m *LogMove, b *bbolt.Bucket, key []byte) error {
return err return err
} }
if !m.HasOld { parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
if !ok {
return t.removeNode(b, key, m.Child, m.Parent) return t.removeNode(b, key, m.Child, m.Parent)
} }
return t.addNode(b, key, m.Child, m.Old.Parent, m.Old.Meta)
var meta Meta
if err := meta.FromBytes(rawMeta); err != nil {
return err
}
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
} }
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool { func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
key := make([]byte, 9) key := make([]byte, 9)
key[0] = 'p' key[0] = 's'
for node := child; node != parent; { for node := child; node != parent; {
binary.LittleEndian.PutUint64(key[1:], node) binary.LittleEndian.PutUint64(key[1:], node)
rawParent := b.Get(key) parent, _, _, ok := t.getState(b, key)
if len(rawParent) != 8 { if !ok {
return false return false
} }
node = binary.LittleEndian.Uint64(rawParent) node = parent
} }
return true return true
} }
@ -619,10 +619,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
return nil return nil
} }
var ( var maxTimestamp uint64
childID [9]byte
maxTimestamp uint64
)
c := b.Cursor() c := b.Cursor()
@ -632,7 +629,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest { if latest {
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child))) _, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp { if ts >= maxTimestamp {
nodes = append(nodes[:0], child) nodes = append(nodes[:0], child)
maxTimestamp = ts maxTimestamp = ts
@ -655,7 +652,7 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
return Meta{}, 0, ErrDegradedMode return Meta{}, 0, ErrDegradedMode
} }
key := parentKey(make([]byte, 9), nodeID) key := stateKey(make([]byte, 9), nodeID)
var m Meta var m Meta
var parentID uint64 var parentID uint64
@ -667,10 +664,11 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
} }
b := treeRoot.Bucket(dataBucket) b := treeRoot.Bucket(dataBucket)
if data := b.Get(key); len(data) == 8 { if data := b.Get(key); len(data) != 0 {
parentID = binary.LittleEndian.Uint64(data) parentID = binary.LittleEndian.Uint64(data)
} }
return m.FromBytes(b.Get(metaKey(key, nodeID))) _, _, meta, _ := t.getState(b, stateKey(key, nodeID))
return m.FromBytes(meta)
}) })
return m, parentID, err return m, parentID, err
@ -828,41 +826,31 @@ loop:
} }
func (t *boltForest) moveFromBytes(m *Move, data []byte) error { func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
r := io.NewBinReaderFromBuf(data) return t.logFromBytes(m, data)
m.Child = r.ReadU64LE()
m.Parent = r.ReadU64LE()
m.Meta.DecodeBinary(r)
return r.Err
} }
func (t *boltForest) logFromBytes(lm *LogMove, r *io.BinReader) error { func (t *boltForest) logFromBytes(lm *LogMove, data []byte) error {
lm.Child = r.ReadU64LE() lm.Child = binary.LittleEndian.Uint64(data)
lm.Parent = r.ReadU64LE() lm.Parent = binary.LittleEndian.Uint64(data[8:])
lm.Meta.DecodeBinary(r) return lm.Meta.FromBytes(data[16:])
lm.HasOld = r.ReadBool()
if lm.HasOld {
lm.Old.Parent = r.ReadU64LE()
lm.Old.Meta.DecodeBinary(r)
}
return r.Err
} }
func (t *boltForest) logToBytes(lm *LogMove) []byte { func (t *boltForest) logToBytes(lm *LogMove) []byte {
w := io.NewBufBinWriter() w := io.NewBufBinWriter()
size := 8 + 8 + lm.Meta.Size() + 1 size := 8 + 8 + lm.Meta.Size() + 1
if lm.HasOld { //if lm.HasOld {
size += 8 + lm.Old.Meta.Size() // size += 8 + lm.Old.Meta.Size()
} //}
w.Grow(size) w.Grow(size)
w.WriteU64LE(lm.Child) w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent) w.WriteU64LE(lm.Parent)
lm.Meta.EncodeBinary(w.BinWriter) lm.Meta.EncodeBinary(w.BinWriter)
w.WriteBool(lm.HasOld) //w.WriteBool(lm.HasOld)
if lm.HasOld { //if lm.HasOld {
w.WriteU64LE(lm.Old.Parent) // w.WriteU64LE(lm.Old.Parent)
lm.Old.Meta.EncodeBinary(w.BinWriter) // lm.Old.Meta.EncodeBinary(w.BinWriter)
} //}
return w.Bytes() return w.Bytes()
} }
@ -870,25 +858,37 @@ func bucketName(cid cidSDK.ID, treeID string) []byte {
return []byte(cid.String() + treeID) return []byte(cid.String() + treeID)
} }
// 't' + node (id) -> timestamp when the node first appeared. // 'o' + time -> old meta.
func timestampKey(key []byte, child Node) []byte { func oldKey(key []byte, ts Timestamp) []byte {
key[0] = 't' key[0] = 'o'
binary.LittleEndian.PutUint64(key[1:], ts)
return key[:9]
}
// 's' + child ID -> parent + timestamp of the first appearance + meta.
func stateKey(key []byte, child Node) []byte {
key[0] = 's'
binary.LittleEndian.PutUint64(key[1:], child) binary.LittleEndian.PutUint64(key[1:], child)
return key[:9] return key[:9]
} }
// 'p' + node (id) -> parent (id). func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
func parentKey(key []byte, child Node) []byte { data := make([]byte, len(meta)+8+8)
key[0] = 'p' binary.LittleEndian.PutUint64(data, parent)
binary.LittleEndian.PutUint64(key[1:], child) binary.LittleEndian.PutUint64(data[8:], timestamp)
return key[:9] copy(data[16:], meta)
return b.Put(key, data)
} }
// 'm' + node (id) -> serialized meta. func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
func metaKey(key []byte, child Node) []byte { data := b.Get(key)
key[0] = 'm' if data == nil {
binary.LittleEndian.PutUint64(key[1:], child) return 0, 0, nil, false
return key[:9] }
parent := binary.LittleEndian.Uint64(data)
timestamp := binary.LittleEndian.Uint64(data[8:])
return parent, timestamp, data[16:], true
} }
// 'c' + parent (id) + child (id) -> 0/1. // 'c' + parent (id) + child (id) -> 0/1.

View file

@ -44,7 +44,7 @@ func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogM
lm := s.do(op) lm := s.do(op)
s.operations = append(s.operations, lm) s.operations = append(s.operations, lm)
return &lm, nil return &lm.Move, nil
} }
// TreeAddByPath implements the Forest interface. // TreeAddByPath implements the Forest interface.
@ -66,20 +66,21 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
i, node := s.getPathPrefix(attr, path) i, node := s.getPathPrefix(attr, path)
lm := make([]LogMove, len(path)-i+1) lm := make([]LogMove, len(path)-i+1)
for j := i; j < len(path); j++ { for j := i; j < len(path); j++ {
lm[j-i] = s.do(&Move{ op := s.do(&Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: s.timestamp(d.Position, d.Size), Time: s.timestamp(d.Position, d.Size),
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}}, Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
Child: s.findSpareID(), Child: s.findSpareID(),
}) })
node = lm[j-i].Child lm[j-i] = op.Move
s.operations = append(s.operations, lm[j-i]) node = op.Child
s.operations = append(s.operations, op)
} }
mCopy := make([]KeyValue, len(m)) mCopy := make([]KeyValue, len(m))
copy(mCopy, m) copy(mCopy, m)
lm[len(lm)-1] = s.do(&Move{ op := s.do(&Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: s.timestamp(d.Position, d.Size), Time: s.timestamp(d.Position, d.Size),
@ -87,6 +88,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}, },
Child: s.findSpareID(), Child: s.findSpareID(),
}) })
lm[len(lm)-1] = op.Move
return lm, nil return lm, nil
} }

View file

@ -6,9 +6,15 @@ type nodeInfo struct {
Meta Meta Meta Meta
} }
type move struct {
Move
HasOld bool
Old nodeInfo
}
// state represents state being replicated. // state represents state being replicated.
type state struct { type state struct {
operations []LogMove operations []move
tree tree
} }
@ -20,7 +26,7 @@ func newState() *state {
} }
// undo un-does op and changes s in-place. // undo un-does op and changes s in-place.
func (s *state) undo(op *LogMove) { func (s *state) undo(op *move) {
children := s.tree.childMap[op.Parent] children := s.tree.childMap[op.Parent]
for i := range children { for i := range children {
if children[i] == op.Child { if children[i] == op.Child {
@ -76,8 +82,8 @@ func (s *state) Apply(op *Move) error {
} }
// do performs a single move operation on a tree. // do performs a single move operation on a tree.
func (s *state) do(op *Move) LogMove { func (s *state) do(op *Move) move {
lm := LogMove{ lm := move{
Move: Move{ Move: Move{
Parent: op.Parent, Parent: op.Parent,
Meta: op.Meta, Meta: op.Meta,

View file

@ -36,11 +36,7 @@ type Move struct {
} }
// LogMove represents log record for a single move operation. // LogMove represents log record for a single move operation.
type LogMove struct { type LogMove = Move
Move
HasOld bool
Old nodeInfo
}
const ( const (
// RootID represents the ID of a root node. // RootID represents the ID of a root node.