Move tree sync changes from SUPPORT branch #440
8 changed files with 134 additions and 4 deletions
|
@ -311,6 +311,22 @@ func (e *StorageEngine) TreeExists(ctx context.Context, cid cidSDK.ID, treeID st
|
|||
return err == nil, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeHeight",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cid.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
index, lst, err := e.getTreeShard(ctx, cid, treeID)
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
return lst[index].TreeHeight(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeUpdateLastSyncHeight",
|
||||
|
|
|
@ -188,6 +188,40 @@ func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID strin
|
|||
})
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cid.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
var height uint64
|
||||
var retErr error
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cid, treeID))
|
||||
if treeRoot != nil {
|
||||
k, _ := treeRoot.Bucket(logBucket).Cursor().Last()
|
||||
height = binary.BigEndian.Uint64(k)
|
||||
} else {
|
||||
retErr = ErrTreeNotFound
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil {
|
||||
err = retErr
|
||||
}
|
||||
return height, err
|
||||
}
|
||||
|
||||
// TreeExists implements the Forest interface.
|
||||
func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists",
|
||||
|
|
|
@ -214,6 +214,15 @@ func (f *memoryForest) TreeList(_ context.Context, cid cid.ID) ([]string, error)
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) TreeHeight(_ context.Context, cid cid.ID, treeID string) (uint64, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
tree, ok := f.treeMap[fullID]
|
||||
if !ok {
|
||||
return 0, ErrTreeNotFound
|
||||
}
|
||||
return tree.operations[len(tree.operations)-1].Time, nil
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (f *memoryForest) TreeExists(_ context.Context, cid cid.ID, treeID string) (bool, error) {
|
||||
fullID := cid.EncodeToString() + "/" + treeID
|
||||
|
|
|
@ -604,10 +604,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O
|
|||
checkExists(t, false, cid, treeID)
|
||||
})
|
||||
|
||||
require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Parent: 0, Child: 1}, false))
|
||||
require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false))
|
||||
checkExists(t, true, cid, treeID)
|
||||
|
||||
height, err := s.TreeHeight(context.Background(), cid, treeID)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 11, height)
|
||||
|
||||
checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
_, err = s.TreeHeight(context.Background(), cidtest.ID(), treeID)
|
||||
require.ErrorIs(t, err, ErrTreeNotFound)
|
||||
|
||||
checkExists(t, false, cid, "another tree") // same CID, different tree
|
||||
|
||||
t.Run("can be removed", func(t *testing.T) {
|
||||
require.NoError(t, s.TreeDrop(context.Background(), cid, treeID))
|
||||
|
|
|
@ -50,6 +50,8 @@ type Forest interface {
|
|||
TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error
|
||||
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
|
||||
TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
|
||||
// TreeHeight returns current tree height.
|
||||
TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error)
|
||||
}
|
||||
|
||||
type ForestStorage interface {
|
||||
|
|
|
@ -255,6 +255,22 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) {
|
|||
return s.pilorama.TreeList(ctx, cid)
|
||||
}
|
||||
|
||||
func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeHeight",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("container_id", cid.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
if s.pilorama == nil {
|
||||
return 0, ErrPiloramaDisabled
|
||||
}
|
||||
return s.pilorama.TreeHeight(ctx, cid, treeID)
|
||||
}
|
||||
|
||||
// TreeExists implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeExists",
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -31,6 +32,8 @@ type Service struct {
|
|||
syncChan chan struct{}
|
||||
syncPool *ants.Pool
|
||||
|
||||
initialSyncDone atomic.Bool
|
||||
|
||||
// cnrMap contains existing (used) container IDs.
|
||||
cnrMap map[cidSDK.ID]struct{}
|
||||
// cnrMapMtx protects cnrMap
|
||||
|
@ -90,6 +93,10 @@ func (s *Service) Shutdown() {
|
|||
}
|
||||
|
||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -138,6 +145,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
}
|
||||
|
||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -198,6 +209,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
}
|
||||
|
||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -247,6 +262,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
// Move applies client operation to the specified tree and pushes in queue
|
||||
// for replication on other nodes.
|
||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -295,6 +314,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
}
|
||||
|
||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -371,6 +394,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
}
|
||||
|
||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -500,6 +527,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return ErrAlreadySyncing
|
||||
}
|
||||
|
||||
b := req.GetBody()
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
@ -532,9 +563,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
h := b.GetHeight()
|
||||
lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h)
|
||||
if err != nil || lm.Time == 0 {
|
||||
if err != nil || lm.Time == 0 || lastHeight < lm.Time {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -556,6 +591,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
|||
}
|
||||
|
||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
var cid cidSDK.ID
|
||||
|
||||
err := cid.Decode(req.GetBody().GetContainerId())
|
||||
|
@ -639,5 +678,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
|||
}
|
||||
|
||||
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||
if !s.initialSyncDone.Load() {
|
||||
return nil, ErrAlreadySyncing
|
||||
}
|
||||
|
||||
return new(HealthcheckResponse), nil
|
||||
}
|
||||
|
|
|
@ -388,7 +388,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
s.log.Error(logs.TreeCouldNotFetchContainers, zap.Error(err))
|
||||
s.metrics.AddSyncDuration(time.Since(start), false)
|
||||
span.End()
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||
|
@ -402,6 +402,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
s.metrics.AddSyncDuration(time.Since(start), true)
|
||||
span.End()
|
||||
}
|
||||
s.initialSyncDone.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue