[#741] treesvc: Refactor tree sync

Fix linter issues.
Add error logging.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-11-28 17:04:02 +03:00
parent 1c735efedd
commit b20298007e
2 changed files with 22 additions and 15 deletions

View file

@ -65,6 +65,8 @@ const (
TreeSynchronizeTree = "synchronize tree"
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization"
TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization"
TreeSyncingTrees = "syncing trees..."
TreeCouldNotFetchContainers = "could not fetch containers"
TreeTreesHaveBeenSynchronized = "trees have been synchronized"

View file

@ -217,8 +217,10 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
) error {
treeClient := NewTreeServiceClient(cc)
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
@ -296,29 +298,19 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address
if err := a.FromString(addr); err != nil {
s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
}
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := s.dialCtx(egCtx, a)
if err != nil {
// Failed to connect, try the next address.
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
}
defer cc.Close()
treeClient := NewTreeServiceClient(cc)
err = s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
if err != nil {
// Error with the response, try the next node.
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
}
nodeSynced = err == nil
@ -348,6 +340,19 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return from
}
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
return grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")