From ec73bda3f8711c83a8e97bdb2c0b4a85b6205e9a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 13 Jun 2023 11:26:59 +0300 Subject: [PATCH 1/3] [#266] pilorama: Allow to get current tree height Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/tree.go | 16 +++++++++ pkg/local_object_storage/pilorama/boltdb.go | 34 +++++++++++++++++++ pkg/local_object_storage/pilorama/forest.go | 9 +++++ .../pilorama/forest_test.go | 13 +++++-- .../pilorama/interface.go | 2 ++ pkg/local_object_storage/shard/tree.go | 16 +++++++++ 6 files changed, 88 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 6b8f83f31..08c6d26b0 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -311,6 +311,22 @@ func (e *StorageEngine) TreeExists(ctx context.Context, cid cidSDK.ID, treeID st return err == nil, err } +func (e *StorageEngine) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeHeight", + trace.WithAttributes( + attribute.String("container_id", cid.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + index, lst, err := e.getTreeShard(ctx, cid, treeID) + if err != nil { + return 0, nil + } + return lst[index].TreeHeight(ctx, cid, treeID) +} + // TreeUpdateLastSyncHeight implements the pilorama.Forest interface. func (e *StorageEngine) TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeUpdateLastSyncHeight", diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 9b62f0649..5b2c97f10 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -188,6 +188,40 @@ func (t *boltForest) TreeMove(ctx context.Context, d CIDDescriptor, treeID strin }) } +func (t *boltForest) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { + _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeHeight", + trace.WithAttributes( + attribute.String("container_id", cid.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + t.modeMtx.RLock() + defer t.modeMtx.RUnlock() + + if t.mode.NoMetabase() { + return 0, ErrDegradedMode + } + + var height uint64 + var retErr error + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cid, treeID)) + if treeRoot != nil { + k, _ := treeRoot.Bucket(logBucket).Cursor().Last() + height = binary.BigEndian.Uint64(k) + } else { + retErr = ErrTreeNotFound + } + return nil + }) + if err == nil { + err = retErr + } + return height, err +} + // TreeExists implements the Forest interface. func (t *boltForest) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) { _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeExists", diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 672a38edd..7c5897ed0 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -214,6 +214,15 @@ func (f *memoryForest) TreeList(_ context.Context, cid cid.ID) ([]string, error) return res, nil } +func (f *memoryForest) TreeHeight(_ context.Context, cid cid.ID, treeID string) (uint64, error) { + fullID := cid.EncodeToString() + "/" + treeID + tree, ok := f.treeMap[fullID] + if !ok { + return 0, ErrTreeNotFound + } + return tree.operations[len(tree.operations)-1].Time, nil +} + // TreeExists implements the pilorama.Forest interface. func (f *memoryForest) TreeExists(_ context.Context, cid cid.ID, treeID string) (bool, error) { fullID := cid.EncodeToString() + "/" + treeID diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index ebb4667f5..9e8e98863 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -604,10 +604,19 @@ func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...O checkExists(t, false, cid, treeID) }) - require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Parent: 0, Child: 1}, false)) + require.NoError(t, s.TreeApply(context.Background(), cid, treeID, &Move{Meta: Meta{Time: 11}, Parent: 0, Child: 1}, false)) checkExists(t, true, cid, treeID) + + height, err := s.TreeHeight(context.Background(), cid, treeID) + require.NoError(t, err) + require.EqualValues(t, 11, height) + checkExists(t, false, cidtest.ID(), treeID) // different CID, same tree - checkExists(t, false, cid, "another tree") // same CID, different tree + + _, err = s.TreeHeight(context.Background(), cidtest.ID(), treeID) + require.ErrorIs(t, err, ErrTreeNotFound) + + checkExists(t, false, cid, "another tree") // same CID, different tree t.Run("can be removed", func(t *testing.T) { require.NoError(t, s.TreeDrop(context.Background(), cid, treeID)) diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 9ca721be8..c8287c6d4 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -50,6 +50,8 @@ type Forest interface { TreeUpdateLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) error // TreeLastSyncHeight returns last log height synchronized with _all_ container nodes. TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) + // TreeHeight returns current tree height. + TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) } type ForestStorage interface { diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index ad89fa633..2331d37a8 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -255,6 +255,22 @@ func (s *Shard) TreeList(ctx context.Context, cid cidSDK.ID) ([]string, error) { return s.pilorama.TreeList(ctx, cid) } +func (s *Shard) TreeHeight(ctx context.Context, cid cidSDK.ID, treeID string) (uint64, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeHeight", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("container_id", cid.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + if s.pilorama == nil { + return 0, ErrPiloramaDisabled + } + return s.pilorama.TreeHeight(ctx, cid, treeID) +} + // TreeExists implements the pilorama.Forest interface. func (s *Shard) TreeExists(ctx context.Context, cid cidSDK.ID, treeID string) (bool, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeExists", -- 2.45.2 From 98d6580eb847cd2df135e3d32a6de37bacbaa57c Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 13 Jun 2023 11:32:01 +0300 Subject: [PATCH 2/3] [#266] services/tree: Return operation log up to some height Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/service.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 96e547f36..4364095e2 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -532,9 +532,13 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } h := b.GetHeight() + lastHeight, err := s.forest.TreeHeight(srv.Context(), cid, b.GetTreeId()) + if err != nil { + return err + } for { lm, err := s.forest.TreeGetOpLog(srv.Context(), cid, b.GetTreeId(), h) - if err != nil || lm.Time == 0 { + if err != nil || lm.Time == 0 || lastHeight < lm.Time { return err } -- 2.45.2 From 040ae160c1dbd24beb6a10718d3fb5a68de1f00a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 13 Jun 2023 11:43:25 +0300 Subject: [PATCH 3/3] [#266] services/tree: Add sync check Do not accept requests until initial sync is finished. `Apply` is deliberately left out -- we don't want to miss anything new. Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/service.go | 39 ++++++++++++++++++++++++++++++++++++ pkg/services/tree/sync.go | 3 ++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 4364095e2..12d970c42 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -31,6 +32,8 @@ type Service struct { syncChan chan struct{} syncPool *ants.Pool + initialSyncDone atomic.Bool + // cnrMap contains existing (used) container IDs. cnrMap map[cidSDK.ID]struct{} // cnrMapMtx protects cnrMap @@ -90,6 +93,10 @@ func (s *Service) Shutdown() { } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -138,6 +145,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error } func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -198,6 +209,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -247,6 +262,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -295,6 +314,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er } func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -371,6 +394,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) } func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -500,6 +527,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -560,6 +591,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + var cid cidSDK.ID err := cid.Decode(req.GetBody().GetContainerId()) @@ -643,5 +678,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI } func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + return new(HealthcheckResponse), nil } diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index d132faf6e..e44e8dbbf 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -388,7 +388,7 @@ func (s *Service) syncLoop(ctx context.Context) { s.log.Error(logs.TreeCouldNotFetchContainers, zap.Error(err)) s.metrics.AddSyncDuration(time.Since(start), false) span.End() - continue + break } newMap, cnrsToSync := s.containersToSync(cnrs) @@ -402,6 +402,7 @@ func (s *Service) syncLoop(ctx context.Context) { s.metrics.AddSyncDuration(time.Since(start), true) span.End() } + s.initialSyncDone.Store(true) } } -- 2.45.2