Evgenii Stratonikov
5e2fcec60f
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Previously `newHeight` was updated in parallel, so that applying operation at height H did not imply successful TreeApply() for H-1. And because we have no context in TreeUpdateLastSyncHeight(), invalid starting height could be written if the context was canceled. In this commit we return the new height only if all operations were successfully applied. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
415 lines
10 KiB
Go
415 lines
10 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
// ErrNotInContainer is returned when operation could not be performed
|
|
// because the node is not included in the container.
|
|
var ErrNotInContainer = errors.New("node is not in container")
|
|
|
|
const defaultSyncWorkerCount = 20
|
|
|
|
// synchronizeAllTrees synchronizes all the trees of the container. It fetches
|
|
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
|
|
// is not included in the container.
|
|
func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
|
|
nodes, pos, err := s.getContainerNodes(cid)
|
|
if err != nil {
|
|
return fmt.Errorf("can't get container nodes: %w", err)
|
|
}
|
|
|
|
if pos < 0 {
|
|
return ErrNotInContainer
|
|
}
|
|
|
|
nodes = randomizeNodeOrder(nodes, pos)
|
|
if len(nodes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
rawCID := make([]byte, sha256.Size)
|
|
cid.Encode(rawCID)
|
|
|
|
req := &TreeListRequest{
|
|
Body: &TreeListRequest_Body{
|
|
ContainerId: rawCID,
|
|
},
|
|
}
|
|
|
|
err = SignMessage(req, s.key)
|
|
if err != nil {
|
|
return fmt.Errorf("could not sign request: %w", err)
|
|
}
|
|
|
|
var resp *TreeListResponse
|
|
var treesToSync []string
|
|
var outErr error
|
|
|
|
err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool {
|
|
resp, outErr = c.TreeList(ctx, req)
|
|
if outErr != nil {
|
|
return false
|
|
}
|
|
|
|
treesToSync = resp.GetBody().GetIds()
|
|
|
|
return true
|
|
})
|
|
if err != nil {
|
|
outErr = err
|
|
}
|
|
|
|
if outErr != nil {
|
|
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
|
|
}
|
|
|
|
for _, tid := range treesToSync {
|
|
h, err := s.forest.TreeLastSyncHeight(cid, tid)
|
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
|
s.log.Warn("could not get last synchronized height for a tree",
|
|
zap.Stringer("cid", cid),
|
|
zap.String("tree", tid))
|
|
continue
|
|
}
|
|
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
|
|
if h < newHeight {
|
|
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
|
|
s.log.Warn("could not update last synchronized height for a tree",
|
|
zap.Stringer("cid", cid),
|
|
zap.String("tree", tid))
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SynchronizeTree tries to synchronize log starting from the last stored height.
|
|
func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error {
|
|
nodes, pos, err := s.getContainerNodes(cid)
|
|
if err != nil {
|
|
return fmt.Errorf("can't get container nodes: %w", err)
|
|
}
|
|
|
|
if pos < 0 {
|
|
return ErrNotInContainer
|
|
}
|
|
|
|
nodes = randomizeNodeOrder(nodes, pos)
|
|
if len(nodes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
|
s.log.Debug("synchronize tree",
|
|
zap.Stringer("cid", cid),
|
|
zap.String("tree", treeID),
|
|
zap.Uint64("from", from))
|
|
|
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
|
const workersCount = 4
|
|
errGroup.SetLimit(workersCount)
|
|
|
|
heights := make([]uint64, len(nodes))
|
|
for i, n := range nodes {
|
|
i := i
|
|
n := n
|
|
errGroup.Go(func() error {
|
|
height := from
|
|
n.IterateNetworkEndpoints(func(addr string) bool {
|
|
var a network.Address
|
|
if err := a.FromString(addr); err != nil {
|
|
return false
|
|
}
|
|
|
|
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
|
|
grpc.WithChainUnaryInterceptor(
|
|
tracing.NewGRPCUnaryClientInteceptor(),
|
|
),
|
|
grpc.WithChainStreamInterceptor(
|
|
tracing.NewGRPCStreamClientInterceptor(),
|
|
),
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
if err != nil {
|
|
// Failed to connect, try the next address.
|
|
return false
|
|
}
|
|
defer cc.Close()
|
|
|
|
treeClient := NewTreeServiceClient(cc)
|
|
for {
|
|
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
|
|
if height < h {
|
|
height = h
|
|
}
|
|
if err != nil || h <= height {
|
|
// Error with the response, try the next node.
|
|
return true
|
|
}
|
|
}
|
|
})
|
|
|
|
if height <= from { // do not increase starting height on fail
|
|
heights[i] = from
|
|
return nil
|
|
}
|
|
heights[i] = height
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := errGroup.Wait(); err != nil {
|
|
s.log.Warn("failed to run tree synchronization over all nodes", zap.Error(err))
|
|
}
|
|
|
|
newHeight := uint64(math.MaxUint64)
|
|
for _, height := range heights { // take minimum across all clients
|
|
if height < newHeight {
|
|
newHeight = height
|
|
}
|
|
}
|
|
if newHeight == math.MaxUint64 {
|
|
newHeight = from
|
|
}
|
|
return newHeight
|
|
}
|
|
|
|
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
|
rawCID := make([]byte, sha256.Size)
|
|
cid.Encode(rawCID)
|
|
|
|
errG, ctx := errgroup.WithContext(ctx)
|
|
errG.SetLimit(1024)
|
|
|
|
for {
|
|
req := &GetOpLogRequest{
|
|
Body: &GetOpLogRequest_Body{
|
|
ContainerId: rawCID,
|
|
TreeId: treeID,
|
|
Height: height,
|
|
},
|
|
}
|
|
if err := SignMessage(req, s.key); err != nil {
|
|
_ = errG.Wait()
|
|
return height, err
|
|
}
|
|
|
|
c, err := treeClient.GetOpLog(ctx, req)
|
|
if err != nil {
|
|
_ = errG.Wait()
|
|
return height, fmt.Errorf("can't initialize client: %w", err)
|
|
}
|
|
|
|
lastApplied := height
|
|
res, err := c.Recv()
|
|
for ; err == nil; res, err = c.Recv() {
|
|
lm := res.GetBody().GetOperation()
|
|
m := &pilorama.Move{
|
|
Parent: lm.ParentId,
|
|
Child: lm.ChildId,
|
|
}
|
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
|
_ = errG.Wait()
|
|
return height, err
|
|
}
|
|
if lastApplied < m.Meta.Time {
|
|
lastApplied = m.Meta.Time
|
|
}
|
|
errG.Go(func() error {
|
|
return s.forest.TreeApply(cid, treeID, m, true)
|
|
})
|
|
}
|
|
|
|
// First check local errors: if everything is ok, we can update starting height,
|
|
// because everything was applied.
|
|
applyErr := errG.Wait()
|
|
if applyErr != nil {
|
|
return height, applyErr
|
|
}
|
|
|
|
height = lastApplied
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return height, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
|
// been started.
|
|
var ErrAlreadySyncing = errors.New("service is being synchronized")
|
|
|
|
// ErrShuttingDown is returned when the service is shitting down and could not
|
|
// accept any calls.
|
|
var ErrShuttingDown = errors.New("service is shutting down")
|
|
|
|
// SynchronizeAll forces tree service to synchronize all the trees according to
|
|
// netmap information. Must not be called before Service.Start.
|
|
// Returns ErrAlreadySyncing if synchronization has been started and blocked
|
|
// by another routine.
|
|
// Note: non-blocking operation.
|
|
func (s *Service) SynchronizeAll() error {
|
|
select {
|
|
case <-s.closeCh:
|
|
return ErrShuttingDown
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case s.syncChan <- struct{}{}:
|
|
return nil
|
|
default:
|
|
return ErrAlreadySyncing
|
|
}
|
|
}
|
|
|
|
func (s *Service) syncLoop(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-s.closeCh:
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.syncChan:
|
|
s.log.Debug("syncing trees...")
|
|
|
|
cnrs, err := s.cfg.cnrSource.List()
|
|
if err != nil {
|
|
s.log.Error("could not fetch containers", zap.Error(err))
|
|
break
|
|
}
|
|
|
|
newMap, cnrsToSync := s.containersToSync(cnrs)
|
|
|
|
s.syncContainers(ctx, cnrsToSync)
|
|
|
|
s.removeContainers(ctx, newMap)
|
|
|
|
s.log.Debug("trees have been synchronized")
|
|
}
|
|
s.initialSyncDone.Store(true)
|
|
}
|
|
}
|
|
|
|
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
|
|
// sync new containers
|
|
var wg sync.WaitGroup
|
|
for _, cnr := range cnrs {
|
|
wg.Add(1)
|
|
cnr := cnr
|
|
err := s.syncPool.Submit(func() {
|
|
defer wg.Done()
|
|
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
|
|
|
|
err := s.synchronizeAllTrees(ctx, cnr)
|
|
if err != nil {
|
|
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
|
|
return
|
|
}
|
|
|
|
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
|
|
})
|
|
if err != nil {
|
|
wg.Done()
|
|
s.log.Error("could not query trees for synchronization",
|
|
zap.Stringer("cid", cnr),
|
|
zap.Error(err))
|
|
if errors.Is(err, ants.ErrPoolClosed) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
|
|
s.cnrMapMtx.Lock()
|
|
defer s.cnrMapMtx.Unlock()
|
|
|
|
var removed []cid.ID
|
|
for cnr := range s.cnrMap {
|
|
if _, ok := newContainers[cnr]; ok {
|
|
continue
|
|
}
|
|
removed = append(removed, cnr)
|
|
}
|
|
for i := range removed {
|
|
delete(s.cnrMap, removed[i])
|
|
}
|
|
|
|
for _, cnr := range removed {
|
|
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
|
|
|
|
err := s.DropTree(ctx, cnr, "")
|
|
if err != nil {
|
|
s.log.Error("could not remove redundant tree",
|
|
zap.Stringer("cid", cnr),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
|
|
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
|
|
cnrsToSync := make([]cid.ID, 0, len(cnrs))
|
|
|
|
for _, cnr := range cnrs {
|
|
_, pos, err := s.getContainerNodes(cnr)
|
|
if err != nil {
|
|
s.log.Error("could not calculate container nodes",
|
|
zap.Stringer("cid", cnr),
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if pos < 0 {
|
|
// node is not included in the container.
|
|
continue
|
|
}
|
|
|
|
newMap[cnr] = struct{}{}
|
|
cnrsToSync = append(cnrsToSync, cnr)
|
|
}
|
|
return newMap, cnrsToSync
|
|
}
|
|
|
|
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
|
|
// It is assumed that 0 <= pos < len(nodes).
|
|
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
|
|
if len(cnrNodes) == 1 {
|
|
return nil
|
|
}
|
|
|
|
nodes := make([]netmap.NodeInfo, len(cnrNodes)-1)
|
|
n := copy(nodes, cnrNodes[:pos])
|
|
copy(nodes[n:], cnrNodes[pos+1:])
|
|
|
|
rand.Shuffle(len(nodes), func(i, j int) {
|
|
nodes[i], nodes[j] = nodes[j], nodes[i]
|
|
})
|
|
return nodes
|
|
}
|