From 79d59e4ed2ba00e88df010b3b0b5e02cd346c459 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 5 Apr 2023 14:21:03 +0300 Subject: [PATCH] [#266] services/tree: Do not accept requests until initial sync is finished `Apply` is deliberately left out -- we don't want to miss anything new. Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/service.go | 39 ++++++++++++++++++++++++++++++++++++ pkg/services/tree/sync.go | 3 ++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 7ab8fd13c..acce3f1e7 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -13,6 +13,7 @@ import ( cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -31,6 +32,8 @@ type Service struct { syncChan chan struct{} syncPool *ants.Pool + initialSyncDone atomic.Bool + // cnrMap contains existing (used) container IDs. cnrMap map[cidSDK.ID]struct{} // cnrMapMtx protects cnrMap @@ -89,6 +92,10 @@ func (s *Service) Shutdown() { } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error } func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er } func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) } func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + if !s.initialSyncDone.Load() { + return ErrAlreadySyncing + } + b := req.GetBody() var cid cidSDK.ID @@ -559,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) } func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + var cid cidSDK.ID err := cid.Decode(req.GetBody().GetContainerId()) @@ -642,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI } func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { + if !s.initialSyncDone.Load() { + return nil, ErrAlreadySyncing + } + return new(HealthcheckResponse), nil } diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 554d74091..ed87eac45 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -314,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) { cnrs, err := s.cfg.cnrSource.List() if err != nil { s.log.Error("could not fetch containers", zap.Error(err)) - continue + break } newMap, cnrsToSync := s.containersToSync(cnrs) @@ -325,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) { s.log.Debug("trees have been synchronized") } + s.initialSyncDone.Store(true) } }