Store tree service sync height in the metabase #82
7 changed files with 171 additions and 2 deletions
|
@ -213,6 +213,44 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
return err == nil, err
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
index, lst, err := e.getTreeShard(cid, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = lst[index].TreeUpdateLastSyncHeight(cid, treeID, height)
|
||||
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||
e.reportShardError(lst[index], "can't update tree synchronization height", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
var err error
|
||||
var height uint64
|
||||
for _, sh := range e.sortShardsByWeight(cid) {
|
||||
height, err = sh.TreeLastSyncHeight(cid, treeID)
|
||||
carpawell marked this conversation as resolved
|
||||
if err != nil {
|
||||
if err == shard.ErrPiloramaDisabled {
|
||||
break
|
||||
}
|
||||
if !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
e.reportShardError(sh, "can't read tree synchronization height", err,
|
||||
zap.Stringer("cid", cid),
|
||||
zap.String("tree", treeID))
|
||||
}
|
||||
continue
|
||||
}
|
||||
return height, err
|
||||
}
|
||||
return height, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
|
||||
lst := e.sortShardsByWeight(cid)
|
||||
for i, sh := range lst {
|
||||
|
|
|
@ -192,6 +192,46 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
return exists, err
|
||||
}
|
||||
|
||||
var syncHeightKey = []byte{'h'}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (t *boltForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
rawHeight := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(rawHeight, height)
|
||||
|
||||
buck := bucketName(cid, treeID)
|
||||
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(buck)
|
||||
if treeRoot == nil {
|
||||
return ErrTreeNotFound
|
||||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
return b.Put(syncHeightKey, rawHeight)
|
||||
})
|
||||
}
|
||||
|
||||
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (t *boltForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
var height uint64
|
||||
|
||||
buck := bucketName(cid, treeID)
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(buck)
|
||||
if treeRoot == nil {
|
||||
return ErrTreeNotFound
|
||||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
data := b.Get(syncHeightKey)
|
||||
if len(data) == 8 {
|
||||
height = binary.LittleEndian.Uint64(data)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return height, err
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
|
||||
if !d.checkValid() {
|
||||
|
|
|
@ -226,3 +226,24 @@ func (f *memoryForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
_, ok := f.treeMap[fullID]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (f *memoryForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
t, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return ErrTreeNotFound
|
||||
}
|
||||
t.syncHeight = height
|
||||
return nil
|
||||
}
|
||||
|
||||
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (f *memoryForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
t, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return 0, ErrTreeNotFound
|
||||
}
|
||||
return t.syncHeight, nil
|
||||
}
|
||||
|
|
|
@ -1030,3 +1030,52 @@ func testTreeGetTrees(t *testing.T, s Forest) {
|
|||
require.ElementsMatch(t, treeIDs[cid], trees)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTreeLastSyncHeight(t *testing.T) {
|
||||
for i := range providers {
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testTreeLastSyncHeight(t, providers[i].construct(t))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testTreeLastSyncHeight(t *testing.T, f Forest) {
|
||||
cnr := cidtest.ID()
|
||||
treeID := "someTree"
|
||||
|
||||
t.Run("ErrNotFound if no log operations are stored for a tree", func(t *testing.T) {
|
||||
_, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
|
||||
err = f.TreeUpdateLastSyncHeight(cnr, treeID, 1)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
})
|
||||
|
||||
_, err := f.TreeMove(CIDDescriptor{CID: cnr, Size: 1}, treeID, &Move{
|
||||
Parent: RootID,
|
||||
Child: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 0, h)
|
||||
|
||||
t.Run("separate storages for separate containers", func(t *testing.T) {
|
||||
_, err := f.TreeLastSyncHeight(cidtest.ID(), treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
})
|
||||
|
||||
require.NoError(t, f.TreeUpdateLastSyncHeight(cnr, treeID, 10))
|
||||
|
||||
h, err = f.TreeLastSyncHeight(cnr, treeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 10, h)
|
||||
|
||||
t.Run("removed correctly", func(t *testing.T) {
|
||||
require.NoError(t, f.TreeDrop(cnr, treeID))
|
||||
|
||||
_, err := f.TreeLastSyncHeight(cnr, treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -143,8 +143,9 @@ func (s *state) findSpareID() Node {
|
|||
|
||||
// tree is a mapping from the child nodes to their parent and metadata.
|
||||
type tree struct {
|
||||
infoMap map[Node]nodeInfo
|
||||
childMap map[Node][]Node
|
||||
syncHeight uint64
|
||||
infoMap map[Node]nodeInfo
|
||||
childMap map[Node][]Node
|
||||
}
|
||||
|
||||
func newTree() *tree {
|
||||
|
|
|
@ -44,6 +44,10 @@ type Forest interface {
|
|||
// TreeExists checks if a tree exists locally.
|
||||
// If the tree is not found, false and a nil error should be returned.
|
||||
TreeExists(cid cidSDK.ID, treeID string) (bool, error)
|
||||
// TreeUpdateLastSyncHeight updates last log height synchronized with _all_ container nodes.
|
||||
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
|
||||
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
|
||||
}
|
||||
|
||||
type ForestStorage interface {
|
||||
|
|
|
@ -111,3 +111,19 @@ func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
}
|
||||
return s.pilorama.TreeExists(cid, treeID)
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
|
||||
if s.pilorama == nil {
|
||||
return ErrPiloramaDisabled
|
||||
}
|
||||
return s.pilorama.TreeUpdateLastSyncHeight(cid, treeID, height)
|
||||
}
|
||||
|
||||
// TreeLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
if s.pilorama == nil {
|
||||
return 0, ErrPiloramaDisabled
|
||||
}
|
||||
return s.pilorama.TreeLastSyncHeight(cid, treeID)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
why not
getTreeShard
?Mostly because this is a general rule here.
The reason is that
getTreeShard
performsTreeExists
just to find the shard and is really needed only for modifying operations.TreeLastSyncHeight
is cheap, so we can safely use it here without callingTreeExists
for each shard ingetTreeShard
.