[#1401] services/tree: Retransmit queries to container nodes

Also fix a bug with replicator using the multiaddress instead of
<host>:<port> format expected by gRPC library.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-24 14:23:27 +03:00
parent fa57a8be44
commit 86c6c24b86
2 changed files with 170 additions and 5 deletions

View file

@ -0,0 +1,69 @@
package tree
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-node/pkg/network"
cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var errNoSuitableNode = errors.New("no node was found to execute the request")
// forEachNode executes callback for each node in the container.
// If the node belongs to a container, nil error is returned.
// Otherwise, f is executed for each node, stopping if true is returned.
func (s *Service) forEachNode(ctx context.Context, cid cidSDK.ID, f func(c TreeServiceClient) bool) error {
cntNodes, err := s.getContainerNodes(cid)
if err != nil {
return fmt.Errorf("can't get container nodes for %s: %w", cid, err)
}
rawPub := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for _, n := range cntNodes {
if bytes.Equal(n.PublicKey(), rawPub) {
return nil
}
}
var called bool
for _, n := range cntNodes {
var stop bool
n.IterateNetworkEndpoints(func(endpoint string) bool {
c, err := dialTreeService(ctx, endpoint)
if err != nil {
return false
}
s.log.Debug("redirecting tree service query", zap.String("endpoint", endpoint))
called = true
stop = f(c)
return true
})
if stop {
return nil
}
}
if !called {
return errNoSuitableNode
}
return nil
}
func dialTreeService(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
var netAddr network.Address
if err := netAddr.FromString(netmapAddr); err != nil {
return nil, err
}
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
if err != nil {
return nil, err
}
return NewTreeServiceClient(cc), nil
}

View file

@ -53,7 +53,7 @@ func (s *Service) Shutdown() {
close(s.closeCh)
}
func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error) {
func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
@ -66,6 +66,18 @@ func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error)
return nil, err
}
var resp *AddResponse
var outErr error
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
resp, outErr = c.Add(ctx, req)
return true
})
if err != nil {
return nil, err
} else if resp != nil || outErr != nil {
return resp, outErr
}
log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{
Parent: b.GetParentId(),
Child: pilorama.RootID,
@ -83,7 +95,7 @@ func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error)
}, nil
}
func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
@ -96,6 +108,18 @@ func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPat
return nil, err
}
var resp *AddByPathResponse
var outErr error
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
resp, outErr = c.AddByPath(ctx, req)
return true
})
if err != nil {
return nil, err
} else if resp != nil || outErr != nil {
return resp, outErr
}
meta := protoToMeta(b.GetMeta())
attr := b.GetPathAttribute()
@ -125,7 +149,7 @@ func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPat
}, nil
}
func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse, error) {
func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
@ -138,6 +162,18 @@ func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse
return nil, err
}
var resp *RemoveResponse
var outErr error
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
resp, outErr = c.Remove(ctx, req)
return true
})
if err != nil {
return nil, err
} else if resp != nil || outErr != nil {
return resp, outErr
}
if b.GetNodeId() == pilorama.RootID {
return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId())
}
@ -156,7 +192,7 @@ func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse
// Move applies client operation to the specified tree and pushes in queue
// for replication on other nodes.
func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, error) {
func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
@ -169,6 +205,18 @@ func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, erro
return nil, err
}
var resp *MoveResponse
var outErr error
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
resp, outErr = c.Move(ctx, req)
return true
})
if err != nil {
return nil, err
} else if resp != nil || outErr != nil {
return resp, outErr
}
if b.GetNodeId() == pilorama.RootID {
return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId())
}
@ -186,7 +234,7 @@ func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, erro
return new(MoveResponse), nil
}
func (s *Service) GetNodeByPath(_ context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) {
b := req.GetBody()
var cid cidSDK.ID
@ -199,6 +247,18 @@ func (s *Service) GetNodeByPath(_ context.Context, req *GetNodeByPathRequest) (*
return nil, err
}
var resp *GetNodeByPathResponse
var outErr error
err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool {
resp, outErr = c.GetNodeByPath(ctx, req)
return true
})
if err != nil {
return nil, err
} else if resp != nil || outErr != nil {
return resp, outErr
}
attr := b.GetPathAttribute()
if len(attr) == 0 {
attr = pilorama.AttributeFilename
@ -265,6 +325,24 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS
return err
}
var cli TreeService_GetSubTreeClient
var outErr error
err = s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool {
cli, outErr = c.GetSubTree(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
} else if cli != nil {
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
}
queue := []nodeDepthPair{{[]uint64{b.GetRootId()}, 0}}
for len(queue) != 0 {
@ -350,6 +428,24 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer)
return err
}
var cli TreeService_GetOpLogClient
var outErr error
err := s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool {
cli, outErr = c.GetOpLog(srv.Context(), req)
return true
})
if err != nil {
return err
} else if outErr != nil {
return outErr
} else if cli != nil {
for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() {
if err := srv.Send(resp); err != nil {
return err
}
}
}
h := b.GetHeight()
for {
lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)