Evgenii Stratonikov 6fcae9f75a
Some checks failed
Vulncheck / Vulncheck (push) Successful in 1m3s
Pre-commit hooks / Pre-commit (push) Successful in 1m39s
Build / Build Components (push) Successful in 2m4s
Tests and linters / Tests (push) Successful in 1m55s
Tests and linters / Tests with -race (push) Successful in 3m35s
Tests and linters / Run gofumpt (push) Successful in 3m36s
Tests and linters / Staticcheck (push) Successful in 3m52s
Tests and linters / Lint (push) Successful in 4m0s
OCI image / Build container images (push) Successful in 4m53s
Tests and linters / gopls check (push) Failing after 13m1s
[#1621] treesvc: Cancel background sync on failure
If applyOperationStream() exits prematurely, other goroutines will block
on send and errgroup will never finish waiting. In this commit we also
check whether context is cancelled.

Signed-off-by: Evgenii Stratonikov <>
2025-02-03 09:37:55 +00:00

530 lines
14 KiB

package tree
import (
containerCore ""
metrics ""
tracing ""
tracing_grpc ""
cid ""
netmapSDK ""
// ErrNotInContainer is returned when operation could not be performed
// because the node is not included in the container.
var ErrNotInContainer = errors.New("node is not in container")
const defaultSyncWorkerCount = 20
// synchronizeAllTrees synchronizes all the trees of the container. It fetches
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
// is not included in the container.
func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
if pos < 0 {
return ErrNotInContainer
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
rawCID := make([]byte, sha256.Size)
req := &TreeListRequest{
Body: &TreeListRequest_Body{
ContainerId: rawCID,
err = SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("could not sign request: %w", err)
var resp *TreeListResponse
var treesToSync []string
var outErr error
err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
if outErr != nil {
return false
treesToSync = resp.GetBody().GetIds()
return true
if err != nil {
outErr = err
if outErr != nil {
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
for _, tid := range treesToSync {
h, err := s.forest.TreeLastSyncHeight(ctx, cid, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn(ctx, logs.TreeCouldNotGetLastSynchronizedHeightForATree,
zap.Stringer("cid", cid),
zap.String("tree", tid))
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(ctx, cid, tid, newHeight); err != nil {
s.log.Warn(ctx, logs.TreeCouldNotUpdateLastSynchronizedHeightForATree,
zap.Stringer("cid", cid),
zap.String("tree", tid))
return nil
// SynchronizeTree tries to synchronize log starting from the last stored height.
func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
if pos < 0 {
return ErrNotInContainer
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
return nil
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
// Merging different node streams shuffles incoming operations like that:
// x - operation from the stream A
// o - operation from the stream B
// --o---o--x--x--x--o---x--x------> t
// ^
// If all ops have been successfully applied, we must start from the last
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
select {
case ms[i] = <-streams[i]:
case <-ctx.Done():
return minStreamedLastHeight
for {
var minTimeMoveTime uint64 = math.MaxUint64
minTimeMoveIndex := -1
for i, m := range ms {
if m != nil && minTimeMoveTime > m.Time {
minTimeMoveTime = m.Time
minTimeMoveIndex = i
if minTimeMoveIndex == -1 {
select {
case merged <- ms[minTimeMoveIndex]:
case <-ctx.Done():
return minStreamedLastHeight
height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
minStreamedLastHeight = min(minStreamedLastHeight, height)
return minStreamedLastHeight
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move,
) (uint64, error) {
var prev *pilorama.Move
var batch []*pilorama.Move
for m := range operationStream {
// skip already applied op
if prev != nil && prev.Time == m.Time {
prev = m
batch = append(batch, m)
if len(batch) == s.syncBatchSize {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time, err
batch = batch[:0]
if len(batch) > 0 {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time, err
return math.MaxUint64, nil
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
) error {
treeClient := NewTreeServiceClient(cc)
rawCID := make([]byte, sha256.Size)
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
if err := SignMessage(req, s.key); err != nil {
return err
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err
select {
case opsCh <- m:
case <-ctx.Done():
return ctx.Err()
if !errors.Is(err, io.EOF) {
return err
return nil
// synchronizeTree synchronizes operations getting them from different nodes.
// Each available node does stream operations to a separate stream. These streams
// are merged into one big stream ordered by operation time. This way allows to skip
// already applied operation and keep good batching.
// The method returns a height that service should start sync from in the next time.
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo,
) uint64 {
s.log.Debug(ctx, logs.TreeSynchronizeTree, zap.Stringer("cid", cid), zap.String("tree", treeID), zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 1024
nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
for i := range nodeOperationStreams {
nodeOperationStreams[i] = make(chan *pilorama.Move)
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged)
return nil
var minUnappliedHeight uint64
errGroup.Go(func() error {
var err error
minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged)
return err
var allNodesSynced atomic.Bool
for i, n := range nodes {
errGroup.Go(func() error {
var nodeSynced bool
n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address
if err := a.FromString(addr); err != nil {
s.log.Warn(ctx, logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
cc, err := s.createConnection(a)
if err != nil {
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
defer cc.Close()
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
if err != nil {
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
nodeSynced = err == nil
return true
if !nodeSynced {
return nil
if err := errGroup.Wait(); err != nil {
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
newHeight := minStreamedLastHeight
if newHeight > minUnappliedHeight {
newHeight = minUnappliedHeight
} else {
if allNodesSynced.Load() {
return newHeight
return from
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
return grpc.NewClient(a.URIAddr(),
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")
// ErrShuttingDown is returned when the service is shitting down and could not
// accept any calls.
var ErrShuttingDown = errors.New("service is shutting down")
// SynchronizeAll forces tree service to synchronize all the trees according to
// netmap information. Must not be called before Service.Start.
// Returns ErrAlreadySyncing if synchronization has been started and blocked
// by another routine.
// Note: non-blocking operation.
func (s *Service) SynchronizeAll() error {
select {
case <-s.closeCh:
return ErrShuttingDown
select {
case s.syncChan <- struct{}{}:
return nil
return ErrAlreadySyncing
func (s *Service) syncLoop(ctx context.Context) {
for {
select {
case <-s.closeCh:
case <-ctx.Done():
case <-s.syncChan:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
s.log.Info(ctx, logs.TreeSyncingTrees)
start := time.Now()
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotFetchContainers, zap.Error(err))
s.metrics.AddSyncDuration(time.Since(start), false)
newMap, cnrsToSync := s.containersToSync(ctx, cnrs)
s.syncContainers(ctx, cnrsToSync)
s.removeContainers(ctx, newMap)
s.log.Info(ctx, logs.TreeTreesHaveBeenSynchronized)
s.metrics.AddSyncDuration(time.Since(start), true)
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.syncContainers")
defer span.End()
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug(ctx, logs.TreeSyncingContainerTrees, zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotSyncTrees, zap.Stringer("cid", cnr), zap.Error(err))
s.log.Debug(ctx, logs.TreeContainerTreesHaveBeenSynced, zap.Stringer("cid", cnr))
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotQueryTreesForSynchronization,
zap.Stringer("cid", cnr),
if errors.Is(err, ants.ErrPoolClosed) {
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
defer span.End()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotCheckIfContainerExisted,
zap.Stringer("cid", cnr),
} else if existed {
removed = append(removed, cnr)
for i := range removed {
delete(s.cnrMap, removed[i])
for _, cnr := range removed {
s.log.Debug(ctx, logs.TreeRemovingRedundantTrees, zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotRemoveRedundantTree,
zap.Stringer("cid", cnr),
func (s *Service) containersToSync(ctx context.Context, cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotCalculateContainerNodes,
zap.Stringer("cid", cnr),
if pos < 0 {
// node is not included in the container.
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
return newMap, cnrsToSync
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
if len(cnrNodes) == 1 {
return nil
nodes := make([]netmap.NodeInfo, len(cnrNodes)-1)
n := copy(nodes, cnrNodes[:pos])
copy(nodes[n:], cnrNodes[pos+1:])
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
return nodes