[#2210] pilorama: Reduce the amount of keys per node
Under high load we are limited by the _amount_ of keys we need to update in a single transaction. In this commit we try storing all state with a single key. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
9ce883d904
commit
d43dd0b8ca
4 changed files with 126 additions and 122 deletions
|
@ -156,8 +156,7 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove
|
|||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
var lm LogMove
|
||||
lm.Move = *m
|
||||
lm := *m
|
||||
return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
|
@ -227,7 +226,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
|
|||
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
lm = make([]LogMove, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i].Move = Move{
|
||||
lm[j-i] = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: ts,
|
||||
|
@ -245,7 +244,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
|
|||
node = lm[j-i].Child
|
||||
}
|
||||
|
||||
lm[len(lm)-1].Move = Move{
|
||||
lm[len(lm)-1] = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: ts,
|
||||
|
@ -274,17 +273,14 @@ func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint6
|
|||
// findSpareID returns random unused ID.
|
||||
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
||||
id := uint64(rand.Int63())
|
||||
|
||||
var key [9]byte
|
||||
key[0] = 't'
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
key := make([]byte, 9)
|
||||
|
||||
for {
|
||||
if bTree.Get(key[:]) == nil {
|
||||
_, _, _, ok := t.getState(bTree, stateKey(key, id))
|
||||
if !ok {
|
||||
return id
|
||||
}
|
||||
id = uint64(rand.Int63())
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -420,8 +416,12 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
// 1. Undo up until the desired timestamp is here.
|
||||
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
return err
|
||||
|
||||
tmp.Child = r.ReadU64LE()
|
||||
tmp.Parent = r.ReadU64LE()
|
||||
tmp.Time = r.ReadVarUint()
|
||||
if r.Err != nil {
|
||||
return r.Err
|
||||
}
|
||||
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
|
||||
return err
|
||||
|
@ -433,7 +433,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
||||
|
||||
// 2. Insert the operation.
|
||||
lm.Move = *ms[i]
|
||||
*lm = *ms[i]
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -445,11 +445,10 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
|
||||
// 3. Re-apply all other operations.
|
||||
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
if err := t.logFromBytes(&tmp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
|
||||
if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
|
||||
return err
|
||||
}
|
||||
key, value = c.Next()
|
||||
|
@ -460,39 +459,42 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
}
|
||||
|
||||
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
|
||||
currParent := b.Get(parentKey(key, op.Child))
|
||||
op.HasOld = currParent != nil
|
||||
if currParent != nil { // node is already in tree
|
||||
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
|
||||
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
op.HasOld = false
|
||||
op.Old = nodeInfo{}
|
||||
}
|
||||
|
||||
binary.BigEndian.PutUint64(key, op.Time)
|
||||
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil {
|
||||
rawLog := t.logToBytes(op)
|
||||
if err := lb.Put(key[:8], rawLog); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
|
||||
return nil
|
||||
return t.redo(b, key, op, rawLog[16:])
|
||||
}
|
||||
|
||||
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *LogMove, rawMeta []byte) error {
|
||||
var err error
|
||||
|
||||
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
|
||||
if inTree {
|
||||
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
|
||||
} else {
|
||||
ts = op.Time
|
||||
err = b.Delete(oldKey(key, op.Time))
|
||||
}
|
||||
|
||||
if currParent == nil {
|
||||
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
parent := binary.LittleEndian.Uint64(currParent)
|
||||
if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
|
||||
return err
|
||||
}
|
||||
|
||||
if inTree {
|
||||
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range op.Old.Meta.Items {
|
||||
if isAttributeInternal(op.Old.Meta.Items[i].Key) {
|
||||
key = internalKey(key, op.Old.Meta.Items[i].Key, string(op.Old.Meta.Items[i].Value), parent, op.Child)
|
||||
|
||||
var meta Meta
|
||||
if err := meta.FromBytes(currMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range meta.Items {
|
||||
if isAttributeInternal(meta.Items[i].Key) {
|
||||
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
|
||||
err := b.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -500,17 +502,16 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo
|
|||
}
|
||||
}
|
||||
}
|
||||
return t.addNode(b, key, op.Child, op.Parent, op.Meta)
|
||||
return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
|
||||
}
|
||||
|
||||
// removeNode removes node keys from the tree except the children key or its parent.
|
||||
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
|
||||
if err := b.Delete(parentKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
k := stateKey(key, node)
|
||||
_, _, rawMeta, _ := t.getState(b, k)
|
||||
|
||||
var meta Meta
|
||||
var k = metaKey(key, node)
|
||||
if err := meta.FromBytes(b.Get(k)); err == nil {
|
||||
if err := meta.FromBytes(rawMeta); err == nil {
|
||||
for i := range meta.Items {
|
||||
if isAttributeInternal(meta.Items[i].Key) {
|
||||
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
|
||||
|
@ -520,23 +521,16 @@ func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node)
|
|||
}
|
||||
}
|
||||
}
|
||||
if err := b.Delete(metaKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Delete(timestampKey(key, node))
|
||||
return b.Delete(k)
|
||||
}
|
||||
|
||||
// addNode adds node keys to the tree except the timestamp key.
|
||||
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error {
|
||||
err := b.Put(parentKey(key, child), toUint64(parent))
|
||||
if err != nil {
|
||||
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
|
||||
if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(childrenKey(key, child, parent), []byte{1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(metaKey(key, child), meta.Bytes())
|
||||
|
||||
err := b.Put(childrenKey(key, child, parent), []byte{1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -564,22 +558,28 @@ func (t *boltForest) undo(m *LogMove, b *bbolt.Bucket, key []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if !m.HasOld {
|
||||
parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
|
||||
if !ok {
|
||||
return t.removeNode(b, key, m.Child, m.Parent)
|
||||
}
|
||||
return t.addNode(b, key, m.Child, m.Old.Parent, m.Old.Meta)
|
||||
|
||||
var meta Meta
|
||||
if err := meta.FromBytes(rawMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
|
||||
}
|
||||
|
||||
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
|
||||
key := make([]byte, 9)
|
||||
key[0] = 'p'
|
||||
key[0] = 's'
|
||||
for node := child; node != parent; {
|
||||
binary.LittleEndian.PutUint64(key[1:], node)
|
||||
rawParent := b.Get(key)
|
||||
if len(rawParent) != 8 {
|
||||
parent, _, _, ok := t.getState(b, key)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
node = binary.LittleEndian.Uint64(rawParent)
|
||||
node = parent
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -619,10 +619,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
childID [9]byte
|
||||
maxTimestamp uint64
|
||||
)
|
||||
var maxTimestamp uint64
|
||||
|
||||
c := b.Cursor()
|
||||
|
||||
|
@ -632,7 +629,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
|
||||
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
|
||||
if latest {
|
||||
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child)))
|
||||
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
|
||||
if ts >= maxTimestamp {
|
||||
nodes = append(nodes[:0], child)
|
||||
maxTimestamp = ts
|
||||
|
@ -655,7 +652,7 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
|
|||
return Meta{}, 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := parentKey(make([]byte, 9), nodeID)
|
||||
key := stateKey(make([]byte, 9), nodeID)
|
||||
|
||||
var m Meta
|
||||
var parentID uint64
|
||||
|
@ -667,10 +664,11 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
|
|||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
if data := b.Get(key); len(data) == 8 {
|
||||
if data := b.Get(key); len(data) != 0 {
|
||||
parentID = binary.LittleEndian.Uint64(data)
|
||||
}
|
||||
return m.FromBytes(b.Get(metaKey(key, nodeID)))
|
||||
_, _, meta, _ := t.getState(b, stateKey(key, nodeID))
|
||||
return m.FromBytes(meta)
|
||||
})
|
||||
|
||||
return m, parentID, err
|
||||
|
@ -828,41 +826,31 @@ loop:
|
|||
}
|
||||
|
||||
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
||||
r := io.NewBinReaderFromBuf(data)
|
||||
m.Child = r.ReadU64LE()
|
||||
m.Parent = r.ReadU64LE()
|
||||
m.Meta.DecodeBinary(r)
|
||||
return r.Err
|
||||
return t.logFromBytes(m, data)
|
||||
}
|
||||
|
||||
func (t *boltForest) logFromBytes(lm *LogMove, r *io.BinReader) error {
|
||||
lm.Child = r.ReadU64LE()
|
||||
lm.Parent = r.ReadU64LE()
|
||||
lm.Meta.DecodeBinary(r)
|
||||
lm.HasOld = r.ReadBool()
|
||||
if lm.HasOld {
|
||||
lm.Old.Parent = r.ReadU64LE()
|
||||
lm.Old.Meta.DecodeBinary(r)
|
||||
}
|
||||
return r.Err
|
||||
func (t *boltForest) logFromBytes(lm *LogMove, data []byte) error {
|
||||
lm.Child = binary.LittleEndian.Uint64(data)
|
||||
lm.Parent = binary.LittleEndian.Uint64(data[8:])
|
||||
return lm.Meta.FromBytes(data[16:])
|
||||
}
|
||||
|
||||
func (t *boltForest) logToBytes(lm *LogMove) []byte {
|
||||
w := io.NewBufBinWriter()
|
||||
size := 8 + 8 + lm.Meta.Size() + 1
|
||||
if lm.HasOld {
|
||||
size += 8 + lm.Old.Meta.Size()
|
||||
}
|
||||
//if lm.HasOld {
|
||||
// size += 8 + lm.Old.Meta.Size()
|
||||
//}
|
||||
|
||||
w.Grow(size)
|
||||
w.WriteU64LE(lm.Child)
|
||||
w.WriteU64LE(lm.Parent)
|
||||
lm.Meta.EncodeBinary(w.BinWriter)
|
||||
w.WriteBool(lm.HasOld)
|
||||
if lm.HasOld {
|
||||
w.WriteU64LE(lm.Old.Parent)
|
||||
lm.Old.Meta.EncodeBinary(w.BinWriter)
|
||||
}
|
||||
//w.WriteBool(lm.HasOld)
|
||||
//if lm.HasOld {
|
||||
// w.WriteU64LE(lm.Old.Parent)
|
||||
// lm.Old.Meta.EncodeBinary(w.BinWriter)
|
||||
//}
|
||||
return w.Bytes()
|
||||
}
|
||||
|
||||
|
@ -870,25 +858,37 @@ func bucketName(cid cidSDK.ID, treeID string) []byte {
|
|||
return []byte(cid.String() + treeID)
|
||||
}
|
||||
|
||||
// 't' + node (id) -> timestamp when the node first appeared.
|
||||
func timestampKey(key []byte, child Node) []byte {
|
||||
key[0] = 't'
|
||||
// 'o' + time -> old meta.
|
||||
func oldKey(key []byte, ts Timestamp) []byte {
|
||||
key[0] = 'o'
|
||||
binary.LittleEndian.PutUint64(key[1:], ts)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 's' + child ID -> parent + timestamp of the first appearance + meta.
|
||||
func stateKey(key []byte, child Node) []byte {
|
||||
key[0] = 's'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 'p' + node (id) -> parent (id).
|
||||
func parentKey(key []byte, child Node) []byte {
|
||||
key[0] = 'p'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
|
||||
data := make([]byte, len(meta)+8+8)
|
||||
binary.LittleEndian.PutUint64(data, parent)
|
||||
binary.LittleEndian.PutUint64(data[8:], timestamp)
|
||||
copy(data[16:], meta)
|
||||
return b.Put(key, data)
|
||||
}
|
||||
|
||||
// 'm' + node (id) -> serialized meta.
|
||||
func metaKey(key []byte, child Node) []byte {
|
||||
key[0] = 'm'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
|
||||
data := b.Get(key)
|
||||
if data == nil {
|
||||
return 0, 0, nil, false
|
||||
}
|
||||
|
||||
parent := binary.LittleEndian.Uint64(data)
|
||||
timestamp := binary.LittleEndian.Uint64(data[8:])
|
||||
return parent, timestamp, data[16:], true
|
||||
}
|
||||
|
||||
// 'c' + parent (id) + child (id) -> 0/1.
|
||||
|
|
|
@ -44,7 +44,7 @@ func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogM
|
|||
|
||||
lm := s.do(op)
|
||||
s.operations = append(s.operations, lm)
|
||||
return &lm, nil
|
||||
return &lm.Move, nil
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
|
@ -66,20 +66,21 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
|||
i, node := s.getPathPrefix(attr, path)
|
||||
lm := make([]LogMove, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i] = s.do(&Move{
|
||||
op := s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
node = lm[j-i].Child
|
||||
s.operations = append(s.operations, lm[j-i])
|
||||
lm[j-i] = op.Move
|
||||
node = op.Child
|
||||
s.operations = append(s.operations, op)
|
||||
}
|
||||
|
||||
mCopy := make([]KeyValue, len(m))
|
||||
copy(mCopy, m)
|
||||
lm[len(lm)-1] = s.do(&Move{
|
||||
op := s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
|
@ -87,6 +88,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
|||
},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
lm[len(lm)-1] = op.Move
|
||||
return lm, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,9 +6,15 @@ type nodeInfo struct {
|
|||
Meta Meta
|
||||
}
|
||||
|
||||
type move struct {
|
||||
Move
|
||||
HasOld bool
|
||||
Old nodeInfo
|
||||
}
|
||||
|
||||
// state represents state being replicated.
|
||||
type state struct {
|
||||
operations []LogMove
|
||||
operations []move
|
||||
tree
|
||||
}
|
||||
|
||||
|
@ -20,7 +26,7 @@ func newState() *state {
|
|||
}
|
||||
|
||||
// undo un-does op and changes s in-place.
|
||||
func (s *state) undo(op *LogMove) {
|
||||
func (s *state) undo(op *move) {
|
||||
children := s.tree.childMap[op.Parent]
|
||||
for i := range children {
|
||||
if children[i] == op.Child {
|
||||
|
@ -76,8 +82,8 @@ func (s *state) Apply(op *Move) error {
|
|||
}
|
||||
|
||||
// do performs a single move operation on a tree.
|
||||
func (s *state) do(op *Move) LogMove {
|
||||
lm := LogMove{
|
||||
func (s *state) do(op *Move) move {
|
||||
lm := move{
|
||||
Move: Move{
|
||||
Parent: op.Parent,
|
||||
Meta: op.Meta,
|
||||
|
|
|
@ -36,11 +36,7 @@ type Move struct {
|
|||
}
|
||||
|
||||
// LogMove represents log record for a single move operation.
|
||||
type LogMove struct {
|
||||
Move
|
||||
HasOld bool
|
||||
Old nodeInfo
|
||||
}
|
||||
type LogMove = Move
|
||||
|
||||
const (
|
||||
// RootID represents the ID of a root node.
|
||||
|
|
Loading…
Reference in a new issue