frostfs-node/pkg/services/tree/sync.go
Evgenii Stratonikov 5e2fcec60f [#396] treesvc: properly remember last height on shutdown
Previously `newHeight` was updated in parallel, so that applying
operation at height H did not imply successful TreeApply() for H-1.
And because we have no context in TreeUpdateLastSyncHeight(), invalid
starting height could be written if the context was canceled.

In this commit we return the new height only if all operations were
successfully applied.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-29 10:25:26 +00:00

415 lines
10 KiB
Go

package tree
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"math"
"math/rand"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// ErrNotInContainer is returned when operation could not be performed
// because the node is not included in the container.
var ErrNotInContainer = errors.New("node is not in container")
const defaultSyncWorkerCount = 20
// synchronizeAllTrees synchronizes all the trees of the container. It fetches
// tree IDs from the other container nodes. Returns ErrNotInContainer if the node
// is not included in the container.
func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
}
if pos < 0 {
return ErrNotInContainer
}
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
}
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
req := &TreeListRequest{
Body: &TreeListRequest_Body{
ContainerId: rawCID,
},
}
err = SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("could not sign request: %w", err)
}
var resp *TreeListResponse
var treesToSync []string
var outErr error
err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
if outErr != nil {
return false
}
treesToSync = resp.GetBody().GetIds()
return true
})
if err != nil {
outErr = err
}
if outErr != nil {
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
}
for _, tid := range treesToSync {
h, err := s.forest.TreeLastSyncHeight(cid, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn("could not get last synchronized height for a tree",
zap.Stringer("cid", cid),
zap.String("tree", tid))
continue
}
newHeight := s.synchronizeTree(ctx, cid, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(cid, tid, newHeight); err != nil {
s.log.Warn("could not update last synchronized height for a tree",
zap.Stringer("cid", cid),
zap.String("tree", tid))
}
}
}
return nil
}
// SynchronizeTree tries to synchronize log starting from the last stored height.
func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error {
nodes, pos, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
}
if pos < 0 {
return ErrNotInContainer
}
nodes = randomizeNodeOrder(nodes, pos)
if len(nodes) == 0 {
return nil
}
s.synchronizeTree(ctx, cid, 0, treeID, nodes)
return nil
}
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug("synchronize tree",
zap.Stringer("cid", cid),
zap.String("tree", treeID),
zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 4
errGroup.SetLimit(workersCount)
heights := make([]uint64, len(nodes))
for i, n := range nodes {
i := i
n := n
errGroup.Go(func() error {
height := from
n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address
if err := a.FromString(addr); err != nil {
return false
}
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
tracing.NewGRPCUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
tracing.NewGRPCStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
// Failed to connect, try the next address.
return false
}
defer cc.Close()
treeClient := NewTreeServiceClient(cc)
for {
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
if height < h {
height = h
}
if err != nil || h <= height {
// Error with the response, try the next node.
return true
}
}
})
if height <= from { // do not increase starting height on fail
heights[i] = from
return nil
}
heights[i] = height
return nil
})
}
if err := errGroup.Wait(); err != nil {
s.log.Warn("failed to run tree synchronization over all nodes", zap.Error(err))
}
newHeight := uint64(math.MaxUint64)
for _, height := range heights { // take minimum across all clients
if height < newHeight {
newHeight = height
}
}
if newHeight == math.MaxUint64 {
newHeight = from
}
return newHeight
}
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
errG, ctx := errgroup.WithContext(ctx)
errG.SetLimit(1024)
for {
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: height,
},
}
if err := SignMessage(req, s.key); err != nil {
_ = errG.Wait()
return height, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
_ = errG.Wait()
return height, fmt.Errorf("can't initialize client: %w", err)
}
lastApplied := height
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errG.Wait()
return height, err
}
if lastApplied < m.Meta.Time {
lastApplied = m.Meta.Time
}
errG.Go(func() error {
return s.forest.TreeApply(cid, treeID, m, true)
})
}
// First check local errors: if everything is ok, we can update starting height,
// because everything was applied.
applyErr := errG.Wait()
if applyErr != nil {
return height, applyErr
}
height = lastApplied
if err != nil && !errors.Is(err, io.EOF) {
return height, err
}
}
}
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")
// ErrShuttingDown is returned when the service is shitting down and could not
// accept any calls.
var ErrShuttingDown = errors.New("service is shutting down")
// SynchronizeAll forces tree service to synchronize all the trees according to
// netmap information. Must not be called before Service.Start.
// Returns ErrAlreadySyncing if synchronization has been started and blocked
// by another routine.
// Note: non-blocking operation.
func (s *Service) SynchronizeAll() error {
select {
case <-s.closeCh:
return ErrShuttingDown
default:
}
select {
case s.syncChan <- struct{}{}:
return nil
default:
return ErrAlreadySyncing
}
}
func (s *Service) syncLoop(ctx context.Context) {
for {
select {
case <-s.closeCh:
return
case <-ctx.Done():
return
case <-s.syncChan:
s.log.Debug("syncing trees...")
cnrs, err := s.cfg.cnrSource.List()
if err != nil {
s.log.Error("could not fetch containers", zap.Error(err))
break
}
newMap, cnrsToSync := s.containersToSync(cnrs)
s.syncContainers(ctx, cnrsToSync)
s.removeContainers(ctx, newMap)
s.log.Debug("trees have been synchronized")
}
s.initialSyncDone.Store(true)
}
}
func (s *Service) syncContainers(ctx context.Context, cnrs []cid.ID) {
// sync new containers
var wg sync.WaitGroup
for _, cnr := range cnrs {
wg.Add(1)
cnr := cnr
err := s.syncPool.Submit(func() {
defer wg.Done()
s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr))
err := s.synchronizeAllTrees(ctx, cnr)
if err != nil {
s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err))
return
}
s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr))
})
if err != nil {
wg.Done()
s.log.Error("could not query trees for synchronization",
zap.Stringer("cid", cnr),
zap.Error(err))
if errors.Is(err, ants.ErrPoolClosed) {
return
}
}
}
wg.Wait()
}
func (s *Service) removeContainers(ctx context.Context, newContainers map[cid.ID]struct{}) {
s.cnrMapMtx.Lock()
defer s.cnrMapMtx.Unlock()
var removed []cid.ID
for cnr := range s.cnrMap {
if _, ok := newContainers[cnr]; ok {
continue
}
removed = append(removed, cnr)
}
for i := range removed {
delete(s.cnrMap, removed[i])
}
for _, cnr := range removed {
s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr))
err := s.DropTree(ctx, cnr, "")
if err != nil {
s.log.Error("could not remove redundant tree",
zap.Stringer("cid", cnr),
zap.Error(err))
}
}
}
func (s *Service) containersToSync(cnrs []cid.ID) (map[cid.ID]struct{}, []cid.ID) {
newMap := make(map[cid.ID]struct{}, len(s.cnrMap))
cnrsToSync := make([]cid.ID, 0, len(cnrs))
for _, cnr := range cnrs {
_, pos, err := s.getContainerNodes(cnr)
if err != nil {
s.log.Error("could not calculate container nodes",
zap.Stringer("cid", cnr),
zap.Error(err))
continue
}
if pos < 0 {
// node is not included in the container.
continue
}
newMap[cnr] = struct{}{}
cnrsToSync = append(cnrsToSync, cnr)
}
return newMap, cnrsToSync
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
if len(cnrNodes) == 1 {
return nil
}
nodes := make([]netmap.NodeInfo, len(cnrNodes)-1)
n := copy(nodes, cnrNodes[:pos])
copy(nodes[n:], cnrNodes[pos+1:])
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
return nodes
}