Compare commits

..

10 commits

Author SHA1 Message Date
989336df37 [#XX] container: Add ListStream method
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-10-28 18:10:32 +03:00
bc8d79ddf9
[#1447] services/tree: Move relaying code to a separate function
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-24 10:01:03 +03:00
29708b78d7 [#1442] cli/tree: Enchance error message if rpc-endpoint isn't defined
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-23 13:05:17 +00:00
b9284604d9 [#1442] cli/tree: Allow to specify rpc-endpoint with config file
We have several ways to specify the `rpc-endpoint`: with a flag,
with a single config file or multiple files. Before, the `rpc-endpoint`
flag was marked as required. Because `cobra` checked the required flag
presence first, it prevented specifying `rpc-endpoint` with a config file.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-23 13:05:17 +00:00
65a4320c75 [#1441] services/tree: Use grpc.WaitForReady option when creating client
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-23 11:45:44 +00:00
9a260c2e64 [#1441] network/cache: Use grpc.WaitForReady option when creating client
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-23 11:45:44 +00:00
6f798b9c4b [#1441] cli: Use grpc.WaitForReady while initializing SDK client
Before, when the target RPC server was unavailable, requests made
by CLI didn't wait for a timeout specified by the `--timeout` option
if the timeout was more than 20 seconds. It's because of the gRPC
default backoff strategy. Adding this option fixes that behavior.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-23 11:45:44 +00:00
e515dd4582
[#1444] config: Fix data race on morph component init
It could be called for every shard on metabase resync concurrently and
it is possible to get state with initialized client but not initialized
contract hashes.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-23 10:41:36 +03:00
8b6ec57c61 [#1440] sdnotify: Fix status for RELOADING
Before:
```
RELOADING=1
MONOTONIC_USEC=17951246687
STATUS=RELOADING=1
MONOTONIC_USEC=17951246687
```
After:
```
RELOADING=1
MONOTONIC_USEC=17951246687
STATUS=RELOADING
```

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-10-21 14:25:08 +03:00
ed13387c0e
[#1438] .docker: Use go1.23 for builders
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-18 15:20:37 +03:00
37 changed files with 404 additions and 95 deletions

View file

@ -1,4 +1,4 @@
FROM golang:1.22 AS builder
FROM golang:1.23 AS builder
ARG BUILD=now
ARG VERSION=dev
ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.22
FROM golang:1.23
WORKDIR /tmp

View file

@ -1,4 +1,4 @@
FROM golang:1.22 AS builder
FROM golang:1.23 AS builder
ARG BUILD=now
ARG VERSION=dev
ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.22 AS builder
FROM golang:1.23 AS builder
ARG BUILD=now
ARG VERSION=dev
ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.22 AS builder
FROM golang:1.23 AS builder
ARG BUILD=now
ARG VERSION=dev
ARG REPO=repository

View file

@ -85,6 +85,57 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
return list
}
// SearchObjectsRes groups the resulting values of SearchObjects operation.
type ContainerListStreamRes struct {
ids []cid.ID
}
// IDList returns identifiers of the matched objects.
func (x ContainerListStreamRes) IDList() []cid.ID {
return x.ids
}
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
Account: prm.Account,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return res, fmt.Errorf("init container list: %w", err)
}
buf := make([]cid.ID, 10)
var list []cid.ID
var n int
var ok bool
for {
n, ok = rdr.Read(buf)
for i := range n {
list = append(list, buf[i])
}
if !ok {
break
}
}
_, err = rdr.Close()
if err != nil {
return res, fmt.Errorf("read container list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
res.ids = list
return
}
// PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct {
Client *client.Client

View file

@ -58,6 +58,7 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
GRPCDialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
},
}
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {

View file

@ -0,0 +1,91 @@
package container
import (
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
)
var listContainersStreamCmd = &cobra.Command{
Use: "list-stream",
Short: "List all created containers",
Long: "List all created containers",
Run: func(cmd *cobra.Command, _ []string) {
var idUser user.ID
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
if flagVarListContainerOwner == "" && generateKey {
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
}
key := key.GetOrGenerate(cmd)
if flagVarListContainerOwner == "" {
user.IDFromKey(&idUser, key.PublicKey)
} else {
err := idUser.DecodeString(flagVarListContainerOwner)
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
}
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
var prm internalclient.ListContainersPrm
prm.SetClient(cli)
prm.Account = idUser
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
prmGet := internalclient.GetContainerPrm{
Client: cli,
}
containerIDs := res.IDList()
for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String())
continue
}
prmGet.ClientParams.ContainerID = &cnrID
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
if err != nil {
cmd.Printf(" failed to read attributes: %v\n", err)
continue
}
cnr := res.Container()
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
continue
}
cmd.Println(cnrID.String())
if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) {
cmd.Printf(" %s: %s\n", key, val)
})
}
}
},
}
func initContainerListStreamContainersCmd() {
commonflags.Init(listContainersStreamCmd)
flags := listContainersStreamCmd.Flags()
flags.StringVar(&flagVarListName, flagListName, "",
"List containers by the attribute name",
)
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
"Owner of containers (omit to use owner from private key)",
)
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
"Request and print attributes of each container",
)
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
}

View file

@ -21,6 +21,7 @@ var Cmd = &cobra.Command{
func init() {
containerChildCommand := []*cobra.Command{
listContainersCmd,
listContainersStreamCmd,
createContainerCmd,
deleteContainerCmd,
listContainerObjectsCmd,
@ -32,6 +33,7 @@ func init() {
Cmd.AddCommand(containerChildCommand...)
initContainerListContainersCmd()
initContainerListStreamContainersCmd()
initContainerCreateCmd()
initContainerDeleteCmd()
initContainerListObjectsCmd()

View file

@ -30,8 +30,6 @@ func initAddCmd() {
ff := addCmd.Flags()
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func add(cmd *cobra.Command, _ []string) {

View file

@ -36,7 +36,6 @@ func initAddByPathCmd() {
ff.String(pathFlagKey, "", "Path to a node")
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
}

View file

@ -2,6 +2,7 @@ package tree
import (
"context"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
@ -20,7 +21,13 @@ import (
// after making Tree API public.
func _client() (tree.TreeServiceClient, error) {
var netAddr network.Address
err := netAddr.FromString(viper.GetString(commonflags.RPC))
rpcEndpoint := viper.GetString(commonflags.RPC)
if rpcEndpoint == "" {
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
}
err := netAddr.FromString(rpcEndpoint)
if err != nil {
return nil, err
}
@ -34,6 +41,7 @@ func _client() (tree.TreeServiceClient, error) {
metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(),
),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {

View file

@ -36,8 +36,6 @@ func initGetByPathCmd() {
ff.String(pathFlagKey, "", "Path to a node")
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func getByPath(cmd *cobra.Command, _ []string) {

View file

@ -30,8 +30,6 @@ func initGetOpLogCmd() {
ff := getOpLogCmd.Flags()
ff.Uint64(heightFlagKey, 0, "Height to start with")
ff.Uint64(countFlagKey, 10, "Logged operations count")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func getOpLog(cmd *cobra.Command, _ []string) {

View file

@ -20,8 +20,6 @@ var healthcheckCmd = &cobra.Command{
func initHealthcheckCmd() {
commonflags.Init(healthcheckCmd)
ff := healthcheckCmd.Flags()
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func healthcheck(cmd *cobra.Command, _ []string) {

View file

@ -26,8 +26,6 @@ func initListCmd() {
ff := listCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func list(cmd *cobra.Command, _ []string) {

View file

@ -33,8 +33,6 @@ func initMoveCmd() {
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func move(cmd *cobra.Command, _ []string) {

View file

@ -29,8 +29,6 @@ func initRemoveCmd() {
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func remove(cmd *cobra.Command, _ []string) {

View file

@ -34,8 +34,6 @@ func initGetSubtreeCmd() {
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
}
func getSubTree(cmd *cobra.Command, _ []string) {

View file

@ -13,9 +13,7 @@ import (
)
func initAccountingService(ctx context.Context, c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
c.initMorphComponents(ctx)
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
fatalOnErr(err)

View file

@ -575,6 +575,9 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
}
type cfgMorph struct {
initialized bool
guard sync.Mutex
client *client.Client
notaryEnabled bool
@ -1455,10 +1458,7 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
return container.NewInfoProvider(func() (container.Source, error) {
// threadsafe: called on init or on sighup when morph initialized
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
c.initMorphComponents(ctx)
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
if err != nil {
return nil, err

View file

@ -28,7 +28,12 @@ const (
notaryDepositRetriesAmount = 300
)
func initMorphComponents(ctx context.Context, c *cfg) {
func (c *cfg) initMorphComponents(ctx context.Context) {
c.cfgMorph.guard.Lock()
defer c.cfgMorph.guard.Unlock()
if c.cfgMorph.initialized {
return
}
initMorphClient(ctx, c)
lookupScriptHashesInNNS(c) // smart contract auto negotiation
@ -70,6 +75,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
c.netMapSource = netmapSource
c.cfgNetmap.wrapper = wrap
c.cfgMorph.initialized = true
}
func initMorphClient(ctx context.Context, c *cfg) {

View file

@ -143,9 +143,7 @@ func initNetmapService(ctx context.Context, c *cfg) {
parseAttributes(c)
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
c.initMorphComponents(ctx)
initNetmapState(c)

View file

@ -70,6 +70,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
tracing.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}
prmDial := client.PrmDial{

View file

@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
}
type containerStreamerV2 struct {
containerGRPC.ContainerService_ListStreamServer
}
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
return s.ContainerService_ListStreamServer.Send(
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
)
}
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
listReq := new(container.ListStreamRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return err
}
return s.srv.ListStream(listReq, &containerStreamerV2{
ContainerService_ListStreamServer: gStream,
})
}

View file

@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
return nil, apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
defer span.End()
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
if err != nil {
return err
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
nativeschema.PropertyKeyActorRole: role,
}
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
if err != nil {
return err
}
if p, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
if err != nil {
return fmt.Errorf("could not get owner namespace: %w", err)
}
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
return err
}
request := aperequest.NewRequest(
nativeschema.MethodListContainers,
aperequest.NewResource(
resourceName(namespace, ""),
make(map[string]string),
),
reqProps,
)
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
if err != nil {
return fmt.Errorf("failed to get group ids: %w", err)
}
// Policy contract keeps group related chains as namespace-group pair.
for i := range groups {
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
}
rt := policyengine.NewRequestTargetWithNamespace(namespace)
rt.User = &policyengine.Target{
Type: policyengine.User,
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
}
rt.Groups = make([]policyengine.Target, len(groups))
for i := range groups {
rt.Groups[i] = policyengine.GroupTarget(groups[i])
}
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
if err != nil {
return err
}
if found && s == apechain.Allow {
return ac.next.ListStream(req, stream)
}
return apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
defer span.End()

View file

@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
return &container.ListResponse{}, nil
}
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
s.calls["ListStream"]++
return nil
}
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
s.calls["Put"]++
return &container.PutResponse{}, nil

View file

@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
return res, err
}
// ListStream implements Server.
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := a.next.ListStream(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
return err
}
// Put implements Server.
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
res, err := a.next.Put(ctx, req)

View file

@ -14,6 +14,7 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
type executorSvc struct {
@ -93,3 +94,18 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
s.respSvc.SetMeta(resp)
return resp, nil
}
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := s.exec.ListStream(req, stream)
if err != nil {
return fmt.Errorf("could not execute ListStream request: %w", err)
}
r := new(container.ListStreamResponse)
respBody := new(container.ListStreamResponseBody)
r.SetBody(respBody)
s.respSvc.SetMeta(r)
return stream.Send(r)
}

View file

@ -201,3 +201,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil
}
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
)
// Server is an interface of the FrostFS API Container service server.
@ -12,4 +13,11 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
// ListStream is an interface of FrostFS API v2 compatible search streamer.
type ListStream interface {
util.ServerStream
Send(*container.ListStreamResponse) error
}

View file

@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListStreamResponse)
_ = s.sigSvc.SignResponse(resp, err)
return stream.Send(resp)
}
ss := &listStreamSigner{
ListStream: stream,
sigSvc: s.sigSvc,
}
err := s.svc.ListStream(req, ss)
if err != nil || !ss.nonEmptyResp {
return ss.send(new(container.ListStreamResponse), err)
}
return nil
}
type listStreamSigner struct {
ListStream
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
s.nonEmptyResp = true
return s.send(resp, nil)
}
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
if err := s.sigSvc.SignResponse(resp, err); err != nil {
return err
}
return s.ListStream.Send(resp)
}

View file

@ -103,6 +103,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
tracing.NewStreamClientInterceptor(),
),
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}
if !netAddr.IsTLSEnabled() {

View file

@ -12,10 +12,24 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var errNoSuitableNode = errors.New("no node was found to execute the request")
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
var resp *Resp
var outErr error
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = callback(c, ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
// forEachNode executes callback for each node in the container until true is returned.
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {

View file

@ -122,16 +122,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return nil, err
}
if pos < 0 {
var resp *AddResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Add(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add)
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
@ -174,16 +165,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
return nil, err
}
if pos < 0 {
var resp *AddByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.AddByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath)
}
meta := protoToMeta(b.GetMeta())
@ -238,16 +220,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err
}
if pos < 0 {
var resp *RemoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Remove(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove)
}
if b.GetNodeId() == pilorama.RootID {
@ -291,16 +264,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, err
}
if pos < 0 {
var resp *MoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Move(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move)
}
if b.GetNodeId() == pilorama.RootID {
@ -343,16 +307,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
return nil, err
}
if pos < 0 {
var resp *GetNodeByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.GetNodeByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath)
}
attr := b.GetPathAttribute()
@ -763,16 +718,7 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
return nil, err
}
if pos < 0 {
var resp *TreeListResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
return outErr == nil
})
if err != nil {
return nil, err
}
return resp, outErr
return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList)
}
ids, err := s.forest.TreeList(ctx, cid)

View file

@ -342,7 +342,9 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
}
// ErrAlreadySyncing is returned when a service synchronization has already

View file

@ -59,6 +59,8 @@ func FlagAndStatus(status string) error {
return fmt.Errorf("clock_gettime: %w", err)
}
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
status += "\nSTATUS=RELOADING"
return Send(status)
}
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
return Send(status)