diff --git a/pkg/services/tree/redirect.go b/pkg/services/tree/redirect.go new file mode 100644 index 000000000..34fefd908 --- /dev/null +++ b/pkg/services/tree/redirect.go @@ -0,0 +1,69 @@ +package tree + +import ( + "bytes" + "context" + "errors" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neofs-node/pkg/network" + cidSDK "github.com/nspcc-dev/neofs-sdk-go/container/id" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +var errNoSuitableNode = errors.New("no node was found to execute the request") + +// forEachNode executes callback for each node in the container. +// If the node belongs to a container, nil error is returned. +// Otherwise, f is executed for each node, stopping if true is returned. +func (s *Service) forEachNode(ctx context.Context, cid cidSDK.ID, f func(c TreeServiceClient) bool) error { + cntNodes, err := s.getContainerNodes(cid) + if err != nil { + return fmt.Errorf("can't get container nodes for %s: %w", cid, err) + } + + rawPub := (*keys.PublicKey)(&s.key.PublicKey).Bytes() + for _, n := range cntNodes { + if bytes.Equal(n.PublicKey(), rawPub) { + return nil + } + } + + var called bool + for _, n := range cntNodes { + var stop bool + n.IterateNetworkEndpoints(func(endpoint string) bool { + c, err := dialTreeService(ctx, endpoint) + if err != nil { + return false + } + + s.log.Debug("redirecting tree service query", zap.String("endpoint", endpoint)) + called = true + stop = f(c) + return true + }) + if stop { + return nil + } + } + if !called { + return errNoSuitableNode + } + return nil +} + +func dialTreeService(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { + var netAddr network.Address + if err := netAddr.FromString(netmapAddr); err != nil { + return nil, err + } + + cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + return NewTreeServiceClient(cc), nil +} diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 693d36fb3..835b863b8 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -53,7 +53,7 @@ func (s *Service) Shutdown() { close(s.closeCh) } -func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error) { +func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { b := req.GetBody() var cid cidSDK.ID @@ -66,6 +66,18 @@ func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error) return nil, err } + var resp *AddResponse + var outErr error + err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { + resp, outErr = c.Add(ctx, req) + return true + }) + if err != nil { + return nil, err + } else if resp != nil || outErr != nil { + return resp, outErr + } + log, err := s.forest.TreeMove(cid, b.GetTreeId(), &pilorama.Move{ Parent: b.GetParentId(), Child: pilorama.RootID, @@ -83,7 +95,7 @@ func (s *Service) Add(_ context.Context, req *AddRequest) (*AddResponse, error) }, nil } -func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { +func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { b := req.GetBody() var cid cidSDK.ID @@ -96,6 +108,18 @@ func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPat return nil, err } + var resp *AddByPathResponse + var outErr error + err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { + resp, outErr = c.AddByPath(ctx, req) + return true + }) + if err != nil { + return nil, err + } else if resp != nil || outErr != nil { + return resp, outErr + } + meta := protoToMeta(b.GetMeta()) attr := b.GetPathAttribute() @@ -125,7 +149,7 @@ func (s *Service) AddByPath(_ context.Context, req *AddByPathRequest) (*AddByPat }, nil } -func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse, error) { +func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { b := req.GetBody() var cid cidSDK.ID @@ -138,6 +162,18 @@ func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse return nil, err } + var resp *RemoveResponse + var outErr error + err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { + resp, outErr = c.Remove(ctx, req) + return true + }) + if err != nil { + return nil, err + } else if resp != nil || outErr != nil { + return resp, outErr + } + if b.GetNodeId() == pilorama.RootID { return nil, fmt.Errorf("node with ID %d is root and can't be removed", b.GetNodeId()) } @@ -156,7 +192,7 @@ func (s *Service) Remove(_ context.Context, req *RemoveRequest) (*RemoveResponse // Move applies client operation to the specified tree and pushes in queue // for replication on other nodes. -func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, error) { +func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { b := req.GetBody() var cid cidSDK.ID @@ -169,6 +205,18 @@ func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, erro return nil, err } + var resp *MoveResponse + var outErr error + err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { + resp, outErr = c.Move(ctx, req) + return true + }) + if err != nil { + return nil, err + } else if resp != nil || outErr != nil { + return resp, outErr + } + if b.GetNodeId() == pilorama.RootID { return nil, fmt.Errorf("node with ID %d is root and can't be moved", b.GetNodeId()) } @@ -186,7 +234,7 @@ func (s *Service) Move(_ context.Context, req *MoveRequest) (*MoveResponse, erro return new(MoveResponse), nil } -func (s *Service) GetNodeByPath(_ context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { +func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { b := req.GetBody() var cid cidSDK.ID @@ -199,6 +247,18 @@ func (s *Service) GetNodeByPath(_ context.Context, req *GetNodeByPathRequest) (* return nil, err } + var resp *GetNodeByPathResponse + var outErr error + err = s.forEachNode(ctx, cid, func(c TreeServiceClient) bool { + resp, outErr = c.GetNodeByPath(ctx, req) + return true + }) + if err != nil { + return nil, err + } else if resp != nil || outErr != nil { + return resp, outErr + } + attr := b.GetPathAttribute() if len(attr) == 0 { attr = pilorama.AttributeFilename @@ -265,6 +325,24 @@ func (s *Service) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeS return err } + var cli TreeService_GetSubTreeClient + var outErr error + err = s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool { + cli, outErr = c.GetSubTree(srv.Context(), req) + return true + }) + if err != nil { + return err + } else if outErr != nil { + return outErr + } else if cli != nil { + for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { + if err := srv.Send(resp); err != nil { + return err + } + } + } + queue := []nodeDepthPair{{[]uint64{b.GetRootId()}, 0}} for len(queue) != 0 { @@ -350,6 +428,24 @@ func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) return err } + var cli TreeService_GetOpLogClient + var outErr error + err := s.forEachNode(srv.Context(), cid, func(c TreeServiceClient) bool { + cli, outErr = c.GetOpLog(srv.Context(), req) + return true + }) + if err != nil { + return err + } else if outErr != nil { + return outErr + } else if cli != nil { + for resp, err := cli.Recv(); err == nil; resp, err = cli.Recv() { + if err := srv.Send(resp); err != nil { + return err + } + } + } + h := b.GetHeight() for { lm, err := s.forest.TreeGetOpLog(cid, b.GetTreeId(), h)