[#266] services/tree: Do not accept requests until initial sync is finished
`Apply` is deliberately left out -- we don't want to miss anything new. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
364b4ac572
commit
79d59e4ed2
2 changed files with 41 additions and 1 deletions
|
@ -13,6 +13,7 @@ import (
|
||||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -31,6 +32,8 @@ type Service struct {
|
||||||
syncChan chan struct{}
|
syncChan chan struct{}
|
||||||
syncPool *ants.Pool
|
syncPool *ants.Pool
|
||||||
|
|
||||||
|
initialSyncDone atomic.Bool
|
||||||
|
|
||||||
// cnrMap contains existing (used) container IDs.
|
// cnrMap contains existing (used) container IDs.
|
||||||
cnrMap map[cidSDK.ID]struct{}
|
cnrMap map[cidSDK.ID]struct{}
|
||||||
// cnrMapMtx protects cnrMap
|
// cnrMapMtx protects cnrMap
|
||||||
|
@ -89,6 +92,10 @@ func (s *Service) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -137,6 +144,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -197,6 +208,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -246,6 +261,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
// Move applies client operation to the specified tree and pushes in queue
|
// Move applies client operation to the specified tree and pushes in queue
|
||||||
// for replication on other nodes.
|
// for replication on other nodes.
|
||||||
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -294,6 +313,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -370,6 +393,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -499,6 +526,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
b := req.GetBody()
|
b := req.GetBody()
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
@ -559,6 +590,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
var cid cidSDK.ID
|
var cid cidSDK.ID
|
||||||
|
|
||||||
err := cid.Decode(req.GetBody().GetContainerId())
|
err := cid.Decode(req.GetBody().GetContainerId())
|
||||||
|
@ -642,5 +677,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
|
||||||
|
if !s.initialSyncDone.Load() {
|
||||||
|
return nil, ErrAlreadySyncing
|
||||||
|
}
|
||||||
|
|
||||||
return new(HealthcheckResponse), nil
|
return new(HealthcheckResponse), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -314,7 +314,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
cnrs, err := s.cfg.cnrSource.List()
|
cnrs, err := s.cfg.cnrSource.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Error("could not fetch containers", zap.Error(err))
|
s.log.Error("could not fetch containers", zap.Error(err))
|
||||||
continue
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
newMap, cnrsToSync := s.containersToSync(cnrs)
|
newMap, cnrsToSync := s.containersToSync(cnrs)
|
||||||
|
@ -325,6 +325,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
|
|
||||||
s.log.Debug("trees have been synchronized")
|
s.log.Debug("trees have been synchronized")
|
||||||
}
|
}
|
||||||
|
s.initialSyncDone.Store(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue