WIP: Morph: Add unit tests #2

Closed
dstepanov-yadro wants to merge 233 commits from TrueCloudLab/frostfs-node:master into object-3608-morph-unit-tests
2 changed files with 41 additions and 1 deletions
Showing only changes of commit 957a43a124 - Show all commits

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -31,6 +32,8 @@ type Service struct {
syncChan chan struct{} syncChan chan struct{}
syncPool *ants.Pool syncPool *ants.Pool
initialSyncDone atomic.Bool
// cnrMap contains existing (used) container IDs. // cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{} cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap // cnrMapMtx protects cnrMap
@ -90,6 +93,10 @@ func (s *Service) Shutdown() {
} }
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -138,6 +145,10 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
} }
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -198,6 +209,10 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -247,6 +262,10 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
// Move applies client operation to the specified tree and pushes in queue // Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes. // for replication on other nodes.
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -295,6 +314,10 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
} }
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -371,6 +394,10 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
} }
func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -500,6 +527,10 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
if !s.initialSyncDone.Load() {
return ErrAlreadySyncing
}
b := req.GetBody() b := req.GetBody()
var cid cidSDK.ID var cid cidSDK.ID
@ -560,6 +591,10 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
} }
func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
var cid cidSDK.ID var cid cidSDK.ID
err := cid.Decode(req.GetBody().GetContainerId()) err := cid.Decode(req.GetBody().GetContainerId())
@ -643,5 +678,9 @@ func (s *Service) getContainerInfo(cid cidSDK.ID, pub []byte) ([]netmapSDK.NodeI
} }
func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) { func (s *Service) Healthcheck(context.Context, *HealthcheckRequest) (*HealthcheckResponse, error) {
if !s.initialSyncDone.Load() {
return nil, ErrAlreadySyncing
}
return new(HealthcheckResponse), nil return new(HealthcheckResponse), nil
} }

View file

@ -388,7 +388,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.log.Error(logs.TreeCouldNotFetchContainers, zap.Error(err)) s.log.Error(logs.TreeCouldNotFetchContainers, zap.Error(err))
s.metrics.AddSyncDuration(time.Since(start), false) s.metrics.AddSyncDuration(time.Since(start), false)
span.End() span.End()
continue break
} }
newMap, cnrsToSync := s.containersToSync(cnrs) newMap, cnrsToSync := s.containersToSync(cnrs)
@ -402,6 +402,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.metrics.AddSyncDuration(time.Since(start), true) s.metrics.AddSyncDuration(time.Since(start), true)
span.End() span.End()
} }
s.initialSyncDone.Store(true)
} }
} }