[#266] pilorama: Allow to get current tree height

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-06-13 11:26:59 +03:00 committed by Evgenii Stratonikov
parent 0c40d98f7a
commit 2541d319de
6 changed files with 88 additions and 2 deletions

View file

@ -311,6 +311,22 @@ func (e *StorageEngine) TreeExists(ctx context.Context, cid cidSDK.ID, treeID st
return err == nil, err
}
func (e *StorageEngine) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cid, treeID)
if err != nil {
return 0, nil
}
return lst[index].TreeHeight(ctx, cid, treeID)
}
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeUpdateLastSyncHeight",

View file

@ -188,6 +188,40 @@ func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID strin
})
}
func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var height uint64
var retErr error
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cid, treeID))
if treeRoot != nil {
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
height = binary.BigEndian.Uint64(k)
} else {
retErr = ErrTreeNotFound
}
return nil
})
if err == nil {
err = retErr
}
return height, err
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists",

View file

@ -214,6 +214,15 @@ func (f *memoryForest) TreeList(_ context.Context, cid cid.ID) ([]string, error)
return res, nil
}
func (f *memoryForest) TreeHeight(_ context.Context, cid cid.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
tree, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return tree.operations[len(tree.operations)-1].Time, nil
}
// TreeExists implements the pilorama.Forest interface.
func (f *memoryForest) TreeExists(_ context.Context, cid cid.ID, treeID string) (bool, error) {
fullID := cid.EncodeToString() + "/" + treeID

View file

@ -604,10 +604,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
checkExists(t, false, cid, treeID)
})
require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Parent: 0, Child: 1}, false))
require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
checkExists(t, true, cid, treeID)
height, err := s.TreeHeight(context.Background(), cid, treeID)
require.NoError(t, err)
require.EqualValues(t, 11, height)
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
checkExists(t, false, cid, "another tree") // same CID, different tree
_, err = s.TreeHeight(context.Background(), cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
checkExists(t, false, cid, "another tree") // same CID, different tree
t.Run("can be removed", func(t *testing.T) {
require.NoError(t, s.TreeDrop(context.Background(), cid, treeID))

View file

@ -50,6 +50,8 @@ type Forest interface {
TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
// TreeHeight returns current tree height.
TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
}
type ForestStorage interface {

View file

@ -255,6 +255,22 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
return s.pilorama.TreeList(ctx, cid)
}
func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeHeight",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cid.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeHeight(ctx, cid, treeID)
}
// TreeExists implements the pilorama.Forest interface.
func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeExists",