frostfs-node/pkg/services/tree/sync.go
Evgenii Stratonikov 6fcae9f75a [#1621] treesvc: Cancel background sync on failure
If applyOperationStream() exits prematurely, other goroutines will block
on send and errgroup will never finish waiting. In this commit we also
check whether context is cancelled.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-02-03 09:37:55 +00:00

530 lines
14 KiB
Go

package tree
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
metrics "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics/grpc"
tracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
tracing_grpc "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// ErrNotInContainer is returned when operation could not be performed
// because the node is not included in the container.
var ErrNotInContainer = errors.New("node is not in container")
const defaultSyncWorkerCount = 20
// synchronizeAllTrees synchronizes all the trees of the container. It fetches
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
// is not included in the container.
func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
}
if pos < 0 {
return ErrNotInContainer
}
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
}
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
req := &TreeListRequest{
Body: &TreeListRequest_Body{
ContainerId: rawCID,
},
}
err = SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("could not sign request: %w", err)
}
var resp *TreeListResponse
var treesToSync []string
var outErr error
err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
if outErr != nil {
return false
}
treesToSync = resp.GetBody().GetIds()
return true
})
if err != nil {
outErr = err
}
if outErr != nil {
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
}
for _, tid := range treesToSync {
h, err := s.forest.TreeLastSyncHeight(ctx, cid, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn(ctx, logs.TreeCouldNotGetLastSynchronizedHeightForATree,
zap.Stringer("cid", cid),
zap.String("tree", tid))
continue
}
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(ctx, cid, tid, newHeight); err != nil {
s.log.Warn(ctx, logs.TreeCouldNotUpdateLastSynchronizedHeightForATree,
zap.Stringer("cid", cid),
zap.String("tree", tid))
}
}
}
return nil
}
// SynchronizeTree tries to synchronize log starting from the last stored height.
func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
}
if pos < 0 {
return ErrNotInContainer
}
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
}
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
return nil
}
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
// Merging different node streams shuffles incoming operations like that:
//
// x - operation from the stream A
// o - operation from the stream B
//
// --o---o--x--x--x--o---x--x------> t
// ^
// If all ops have been successfully applied, we must start from the last
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
select {
case ms[i] = <-streams[i]:
case <-ctx.Done():
return minStreamedLastHeight
}
}
for {
var minTimeMoveTime uint64 = math.MaxUint64
minTimeMoveIndex := -1
for i, m := range ms {
if m != nil && minTimeMoveTime > m.Time {
minTimeMoveTime = m.Time
minTimeMoveIndex = i
}
}
if minTimeMoveIndex == -1 {
break
}
select {
case merged <- ms[minTimeMoveIndex]:
case <-ctx.Done():
return minStreamedLastHeight
}
height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
minStreamedLastHeight = min(minStreamedLastHeight, height)
}
}
return minStreamedLastHeight
}
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move,
) (uint64, error) {
var prev *pilorama.Move
var batch []*pilorama.Move
for m := range operationStream {
// skip already applied op
if prev != nil && prev.Time == m.Time {
continue
}
prev = m
batch = append(batch, m)
if len(batch) == s.syncBatchSize {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time, err
}
batch = batch[:0]
}
}
if len(batch) > 0 {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time, err
}
}
return math.MaxUint64, nil
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
) error {
treeClient := NewTreeServiceClient(cc)
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
},
}
if err := SignMessage(req, s.key); err != nil {
return err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.GetParentId(),
Child: lm.GetChildId(),
}
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err
}
select {
case opsCh <- m:
case <-ctx.Done():
return ctx.Err()
}
}
if !errors.Is(err, io.EOF) {
return err
}
return nil
}
// synchronizeTree synchronizes operations getting them from different nodes.
// Each available node does stream operations to a separate stream. These streams
// are merged into one big stream ordered by operation time. This way allows to skip
// already applied operation and keep good batching.
// The method returns a height that service should start sync from in the next time.
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo,
) uint64 {
s.log.Debug(ctx, logs.TreeSynchronizeTree, zap.Stringer("cid", cid), zap.String("tree", treeID), zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
for i := range nodeOperationStreams {
nodeOperationStreams[i] = make(chan *pilorama.Move)
}
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged)
return nil
})
var minUnappliedHeight uint64
errGroup.Go(func() error {
var err error
minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged)
return err
})
var allNodesSynced atomic.Bool
allNodesSynced.Store(true)
for i, n := range nodes {
errGroup.Go(func() error {
var nodeSynced bool
n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address
if err := a.FromString(addr); err != nil {
s.log.Warn(ctx, logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
}
cc, err := s.createConnection(a)
if err != nil {
s.log.Warn(ctx, logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false
}
defer cc.Close()
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
if err != nil {
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
}
nodeSynced = err == nil
return true
})
close(nodeOperationStreams[i])
if !nodeSynced {
allNodesSynced.Store(false)
}
return nil
})
}
if err := errGroup.Wait(); err != nil {
allNodesSynced.Store(false)
s.log.Warn(ctx, logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
}
newHeight := minStreamedLastHeight
if newHeight > minUnappliedHeight {
newHeight = minUnappliedHeight
} else {
newHeight++
}
if allNodesSynced.Load() {
return newHeight
}
return from
}
func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
return grpc.NewClient(a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
}
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")
// ErrShuttingDown is returned when the service is shitting down and could not
// accept any calls.
var ErrShuttingDown = errors.New("service is shutting down")
// SynchronizeAll forces tree service to synchronize all the trees according to
// netmap information. Must not be called before Service.Start.
// Returns ErrAlreadySyncing if synchronization has been started and blocked
// by another routine.
// Note: non-blocking operation.
func (s *Service) SynchronizeAll() error {
select {
case <-s.closeCh:
return ErrShuttingDown
default:
}
select {
case s.syncChan <- struct{}{}:
return nil
default:
return ErrAlreadySyncing
}
}
func (s *Service) syncLoop(ctx context.Context) {
for {
select {
case <-s.closeCh:
return
case <-ctx.Done():
return
case <-s.syncChan:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
s.log.Info(ctx, logs.TreeSyncingTrees)
start := time.Now()
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotFetchContainers, zap.Error(err))
s.metrics.AddSyncDuration(time.Since(start), false)
span.End()
break
}
newMap, cnrsToSync := s.containersToSync(ctx, cnrs)
s.syncContainers(ctx, cnrsToSync)
s.removeContainers(ctx, newMap)
s.log.Info(ctx, logs.TreeTreesHaveBeenSynchronized)
s.metrics.AddSyncDuration(time.Since(start), true)
span.End()
}
s.initialSyncDone.Store(true)
}
}
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.syncContainers")
defer span.End()
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
wg.Add(1)
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug(ctx, logs.TreeSyncingContainerTrees, zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotSyncTrees, zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug(ctx, logs.TreeContainerTreesHaveBeenSynced, zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error(ctx, logs.TreeCouldNotQueryTreesForSynchronization,
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.removeContainers")
defer span.End()
s.cnrMapMtx.Lock()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
continue
}
existed, err := containerCore.WasRemoved(s.cnrSource, cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotCheckIfContainerExisted,
zap.Stringer("cid", cnr),
zap.Error(err))
} else if existed {
removed = append(removed, cnr)
}
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
for _, cnr := range removed {
s.log.Debug(ctx, logs.TreeRemovingRedundantTrees, zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotRemoveRedundantTree,
zap.Stringer("cid", cnr),
zap.Error(err))
}
}
}
func (s *Service) containersToSync(ctx context.Context, cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error(ctx, logs.TreeCouldNotCalculateContainerNodes,
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
return newMap, cnrsToSync
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
if len(cnrNodes) == 1 {
return nil
}
nodes := make([]netmap.NodeInfo, len(cnrNodes)-1)
n := copy(nodes, cnrNodes[:pos])
copy(nodes[n:], cnrNodes[pos+1:])
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return nodes
}