Compare commits

..

No commits in common. "989336df371ba6df2d2daae4f16bc27d83d20c6a" and "5afea62ec0c29b8b3422412e4cd8a5452785e6fd" have entirely different histories.

37 changed files with 95 additions and 404 deletions

View file

@ -1,4 +1,4 @@
FROM golang:1.23 AS builder FROM golang:1.22 AS builder
ARG BUILD=now ARG BUILD=now
ARG VERSION=dev ARG VERSION=dev
ARG REPO=repository ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.23 FROM golang:1.22
WORKDIR /tmp WORKDIR /tmp

View file

@ -1,4 +1,4 @@
FROM golang:1.23 AS builder FROM golang:1.22 AS builder
ARG BUILD=now ARG BUILD=now
ARG VERSION=dev ARG VERSION=dev
ARG REPO=repository ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.23 AS builder FROM golang:1.22 AS builder
ARG BUILD=now ARG BUILD=now
ARG VERSION=dev ARG VERSION=dev
ARG REPO=repository ARG REPO=repository

View file

@ -1,4 +1,4 @@
FROM golang:1.23 AS builder FROM golang:1.22 AS builder
ARG BUILD=now ARG BUILD=now
ARG VERSION=dev ARG VERSION=dev
ARG REPO=repository ARG REPO=repository

View file

@ -85,57 +85,6 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
return list return list
} }
// SearchObjectsRes groups the resulting values of SearchObjects operation.
type ContainerListStreamRes struct {
ids []cid.ID
}
// IDList returns identifiers of the matched objects.
func (x ContainerListStreamRes) IDList() []cid.ID {
return x.ids
}
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
Account: prm.Account,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return res, fmt.Errorf("init container list: %w", err)
}
buf := make([]cid.ID, 10)
var list []cid.ID
var n int
var ok bool
for {
n, ok = rdr.Read(buf)
for i := range n {
list = append(list, buf[i])
}
if !ok {
break
}
}
_, err = rdr.Close()
if err != nil {
return res, fmt.Errorf("read container list: %w", err)
}
sort.Slice(list, func(i, j int) bool {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
return strings.Compare(lhs, rhs) < 0
})
res.ids = list
return
}
// PutContainerPrm groups parameters of PutContainer operation. // PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct { type PutContainerPrm struct {
Client *client.Client Client *client.Client

View file

@ -58,7 +58,6 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
GRPCDialOptions: []grpc.DialOption{ GRPCDialOptions: []grpc.DialOption{
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()), grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()), grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}, },
} }
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 { if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {

View file

@ -1,91 +0,0 @@
package container
import (
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
)
var listContainersStreamCmd = &cobra.Command{
Use: "list-stream",
Short: "List all created containers",
Long: "List all created containers",
Run: func(cmd *cobra.Command, _ []string) {
var idUser user.ID
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
if flagVarListContainerOwner == "" && generateKey {
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
}
key := key.GetOrGenerate(cmd)
if flagVarListContainerOwner == "" {
user.IDFromKey(&idUser, key.PublicKey)
} else {
err := idUser.DecodeString(flagVarListContainerOwner)
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
}
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
var prm internalclient.ListContainersPrm
prm.SetClient(cli)
prm.Account = idUser
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
prmGet := internalclient.GetContainerPrm{
Client: cli,
}
containerIDs := res.IDList()
for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String())
continue
}
prmGet.ClientParams.ContainerID = &cnrID
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
if err != nil {
cmd.Printf(" failed to read attributes: %v\n", err)
continue
}
cnr := res.Container()
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
continue
}
cmd.Println(cnrID.String())
if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) {
cmd.Printf(" %s: %s\n", key, val)
})
}
}
},
}
func initContainerListStreamContainersCmd() {
commonflags.Init(listContainersStreamCmd)
flags := listContainersStreamCmd.Flags()
flags.StringVar(&flagVarListName, flagListName, "",
"List containers by the attribute name",
)
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
"Owner of containers (omit to use owner from private key)",
)
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
"Request and print attributes of each container",
)
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
}

View file

@ -21,7 +21,6 @@ var Cmd = &cobra.Command{
func init() { func init() {
containerChildCommand := []*cobra.Command{ containerChildCommand := []*cobra.Command{
listContainersCmd, listContainersCmd,
listContainersStreamCmd,
createContainerCmd, createContainerCmd,
deleteContainerCmd, deleteContainerCmd,
listContainerObjectsCmd, listContainerObjectsCmd,
@ -33,7 +32,6 @@ func init() {
Cmd.AddCommand(containerChildCommand...) Cmd.AddCommand(containerChildCommand...)
initContainerListContainersCmd() initContainerListContainersCmd()
initContainerListStreamContainersCmd()
initContainerCreateCmd() initContainerCreateCmd()
initContainerDeleteCmd() initContainerDeleteCmd()
initContainerListObjectsCmd() initContainerListObjectsCmd()

View file

@ -30,6 +30,8 @@ func initAddCmd() {
ff := addCmd.Flags() ff := addCmd.Flags()
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2") ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
ff.Uint64(parentIDFlagKey, 0, "Parent node ID") ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func add(cmd *cobra.Command, _ []string) { func add(cmd *cobra.Command, _ []string) {

View file

@ -36,6 +36,7 @@ func initAddByPathCmd() {
ff.String(pathFlagKey, "", "Path to a node") ff.String(pathFlagKey, "", "Path to a node")
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2") ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
_ = cobra.MarkFlagRequired(ff, pathFlagKey) _ = cobra.MarkFlagRequired(ff, pathFlagKey)
} }

View file

@ -2,7 +2,6 @@ package tree
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
@ -21,13 +20,7 @@ import (
// after making Tree API public. // after making Tree API public.
func _client() (tree.TreeServiceClient, error) { func _client() (tree.TreeServiceClient, error) {
var netAddr network.Address var netAddr network.Address
err := netAddr.FromString(viper.GetString(commonflags.RPC))
rpcEndpoint := viper.GetString(commonflags.RPC)
if rpcEndpoint == "" {
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
}
err := netAddr.FromString(rpcEndpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -41,7 +34,6 @@ func _client() (tree.TreeServiceClient, error) {
metrics.NewStreamClientInterceptor(), metrics.NewStreamClientInterceptor(),
tracing.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(),
), ),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
} }
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") { if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {

View file

@ -36,6 +36,8 @@ func initGetByPathCmd() {
ff.String(pathFlagKey, "", "Path to a node") ff.String(pathFlagKey, "", "Path to a node")
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node") ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func getByPath(cmd *cobra.Command, _ []string) { func getByPath(cmd *cobra.Command, _ []string) {

View file

@ -30,6 +30,8 @@ func initGetOpLogCmd() {
ff := getOpLogCmd.Flags() ff := getOpLogCmd.Flags()
ff.Uint64(heightFlagKey, 0, "Height to start with") ff.Uint64(heightFlagKey, 0, "Height to start with")
ff.Uint64(countFlagKey, 10, "Logged operations count") ff.Uint64(countFlagKey, 10, "Logged operations count")
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func getOpLog(cmd *cobra.Command, _ []string) { func getOpLog(cmd *cobra.Command, _ []string) {

View file

@ -20,6 +20,8 @@ var healthcheckCmd = &cobra.Command{
func initHealthcheckCmd() { func initHealthcheckCmd() {
commonflags.Init(healthcheckCmd) commonflags.Init(healthcheckCmd)
ff := healthcheckCmd.Flags()
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func healthcheck(cmd *cobra.Command, _ []string) { func healthcheck(cmd *cobra.Command, _ []string) {

View file

@ -26,6 +26,8 @@ func initListCmd() {
ff := listCmd.Flags() ff := listCmd.Flags()
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage) ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag) _ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func list(cmd *cobra.Command, _ []string) { func list(cmd *cobra.Command, _ []string) {

View file

@ -33,6 +33,8 @@ func initMoveCmd() {
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey) _ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey) _ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func move(cmd *cobra.Command, _ []string) { func move(cmd *cobra.Command, _ []string) {

View file

@ -29,6 +29,8 @@ func initRemoveCmd() {
ff.Uint64(nodeIDFlagKey, 0, "Node ID.") ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey) _ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func remove(cmd *cobra.Command, _ []string) { func remove(cmd *cobra.Command, _ []string) {

View file

@ -34,6 +34,8 @@ func initGetSubtreeCmd() {
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag) _ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey) _ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
} }
func getSubTree(cmd *cobra.Command, _ []string) { func getSubTree(cmd *cobra.Command, _ []string) {

View file

@ -13,7 +13,9 @@ import (
) )
func initAccountingService(ctx context.Context, c *cfg) { func initAccountingService(ctx context.Context, c *cfg) {
c.initMorphComponents(ctx) if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0) balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
fatalOnErr(err) fatalOnErr(err)

View file

@ -575,9 +575,6 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
} }
type cfgMorph struct { type cfgMorph struct {
initialized bool
guard sync.Mutex
client *client.Client client *client.Client
notaryEnabled bool notaryEnabled bool
@ -1458,7 +1455,10 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider { func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
return container.NewInfoProvider(func() (container.Source, error) { return container.NewInfoProvider(func() (container.Source, error) {
c.initMorphComponents(ctx) // threadsafe: called on init or on sighup when morph initialized
if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary()) cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -28,12 +28,7 @@ const (
notaryDepositRetriesAmount = 300 notaryDepositRetriesAmount = 300
) )
func (c *cfg) initMorphComponents(ctx context.Context) { func initMorphComponents(ctx context.Context, c *cfg) {
c.cfgMorph.guard.Lock()
defer c.cfgMorph.guard.Unlock()
if c.cfgMorph.initialized {
return
}
initMorphClient(ctx, c) initMorphClient(ctx, c)
lookupScriptHashesInNNS(c) // smart contract auto negotiation lookupScriptHashesInNNS(c) // smart contract auto negotiation
@ -75,7 +70,6 @@ func (c *cfg) initMorphComponents(ctx context.Context) {
c.netMapSource = netmapSource c.netMapSource = netmapSource
c.cfgNetmap.wrapper = wrap c.cfgNetmap.wrapper = wrap
c.cfgMorph.initialized = true
} }
func initMorphClient(ctx context.Context, c *cfg) { func initMorphClient(ctx context.Context, c *cfg) {

View file

@ -143,7 +143,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
parseAttributes(c) parseAttributes(c)
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline) c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
c.initMorphComponents(ctx) if c.cfgMorph.client == nil {
initMorphComponents(ctx, c)
}
initNetmapState(c) initNetmapState(c)

View file

@ -70,7 +70,6 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
tracing.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(),
), ),
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()), grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
} }
prmDial := client.PrmDial{ prmDial := client.PrmDial{

View file

@ -80,26 +80,3 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
} }
type containerStreamerV2 struct {
containerGRPC.ContainerService_ListStreamServer
}
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
return s.ContainerService_ListStreamServer.Send(
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
)
}
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
listReq := new(container.ListStreamRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return err
}
return s.srv.ListStream(listReq, &containerStreamerV2{
ContainerService_ListStreamServer: gStream,
})
}

View file

@ -175,79 +175,6 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
return nil, apeErr(nativeschema.MethodListContainers, s) return nil, apeErr(nativeschema.MethodListContainers, s)
} }
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
defer span.End()
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
if err != nil {
return err
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
nativeschema.PropertyKeyActorRole: role,
}
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
if err != nil {
return err
}
if p, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
if err != nil {
return fmt.Errorf("could not get owner namespace: %w", err)
}
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
return err
}
request := aperequest.NewRequest(
nativeschema.MethodListContainers,
aperequest.NewResource(
resourceName(namespace, ""),
make(map[string]string),
),
reqProps,
)
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
if err != nil {
return fmt.Errorf("failed to get group ids: %w", err)
}
// Policy contract keeps group related chains as namespace-group pair.
for i := range groups {
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
}
rt := policyengine.NewRequestTargetWithNamespace(namespace)
rt.User = &policyengine.Target{
Type: policyengine.User,
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
}
rt.Groups = make([]policyengine.Target, len(groups))
for i := range groups {
rt.Groups[i] = policyengine.GroupTarget(groups[i])
}
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
if err != nil {
return err
}
if found && s == apechain.Allow {
return ac.next.ListStream(req, stream)
}
return apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
defer span.End() defer span.End()

View file

@ -1079,11 +1079,6 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
return &container.ListResponse{}, nil return &container.ListResponse{}, nil
} }
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
s.calls["ListStream"]++
return nil
}
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
s.calls["Put"]++ s.calls["Put"]++
return &container.PutResponse{}, nil return &container.PutResponse{}, nil

View file

@ -63,17 +63,6 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
return res, err return res, err
} }
// ListStream implements Server.
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := a.next.ListStream(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
return err
}
// Put implements Server. // Put implements Server.
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
res, err := a.next.Put(ctx, req) res, err := a.next.Put(ctx, req)

View file

@ -14,7 +14,6 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
ListStream(*container.ListStreamRequest, ListStream) error
} }
type executorSvc struct { type executorSvc struct {
@ -94,18 +93,3 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
s.respSvc.SetMeta(resp) s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := s.exec.ListStream(req, stream)
if err != nil {
return fmt.Errorf("could not execute ListStream request: %w", err)
}
r := new(container.ListStreamResponse)
respBody := new(container.ListStreamResponseBody)
r.SetBody(respBody)
s.respSvc.SetMeta(r)
return stream.Send(r)
}

View file

@ -201,35 +201,3 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil return res, nil
} }
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}

View file

@ -4,7 +4,6 @@ import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
) )
// Server is an interface of the FrostFS API Container service server. // Server is an interface of the FrostFS API Container service server.
@ -13,11 +12,4 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
// ListStream is an interface of FrostFS API v2 compatible search streamer.
type ListStream interface {
util.ServerStream
Send(*container.ListStreamResponse) error
} }

View file

@ -56,40 +56,3 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err) return resp, s.sigSvc.SignResponse(resp, err)
} }
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListStreamResponse)
_ = s.sigSvc.SignResponse(resp, err)
return stream.Send(resp)
}
ss := &listStreamSigner{
ListStream: stream,
sigSvc: s.sigSvc,
}
err := s.svc.ListStream(req, ss)
if err != nil || !ss.nonEmptyResp {
return ss.send(new(container.ListStreamResponse), err)
}
return nil
}
type listStreamSigner struct {
ListStream
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
s.nonEmptyResp = true
return s.send(resp, nil)
}
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
if err := s.sigSvc.SignResponse(resp, err); err != nil {
return err
}
return s.ListStream.Send(resp)
}

View file

@ -103,7 +103,6 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
tracing.NewStreamClientInterceptor(), tracing.NewStreamClientInterceptor(),
), ),
grpc.WithContextDialer(c.ds.GrpcContextDialer()), grpc.WithContextDialer(c.ds.GrpcContextDialer()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
} }
if !netAddr.IsTLSEnabled() { if !netAddr.IsTLSEnabled() {

View file

@ -12,24 +12,10 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc"
) )
var errNoSuitableNode = errors.New("no node was found to execute the request") var errNoSuitableNode = errors.New("no node was found to execute the request")
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
var resp *Resp
var outErr error
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = callback(c, ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
}
// forEachNode executes callback for each node in the container until true is returned. // forEachNode executes callback for each node in the container until true is returned.
// Returns errNoSuitableNode if there was no successful attempt to dial any node. // Returns errNoSuitableNode if there was no successful attempt to dial any node.
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error { func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {

View file

@ -122,7 +122,16 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add) var resp *AddResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Add(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)} d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
@ -165,7 +174,16 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath) var resp *AddByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.AddByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
meta := protoToMeta(b.GetMeta()) meta := protoToMeta(b.GetMeta())
@ -220,7 +238,16 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove) var resp *RemoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Remove(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
if b.GetNodeId() == pilorama.RootID { if b.GetNodeId() == pilorama.RootID {
@ -264,7 +291,16 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move) var resp *MoveResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.Move(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
if b.GetNodeId() == pilorama.RootID { if b.GetNodeId() == pilorama.RootID {
@ -307,7 +343,16 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath) var resp *GetNodeByPathResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.GetNodeByPath(ctx, req)
return true
})
if err != nil {
return nil, err
}
return resp, outErr
} }
attr := b.GetPathAttribute() attr := b.GetPathAttribute()
@ -718,7 +763,16 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
return nil, err return nil, err
} }
if pos < 0 { if pos < 0 {
return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList) var resp *TreeListResponse
var outErr error
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
resp, outErr = c.TreeList(ctx, req)
return outErr == nil
})
if err != nil {
return nil, err
}
return resp, outErr
} }
ids, err := s.forest.TreeList(ctx, cid) ids, err := s.forest.TreeList(ctx, cid)

View file

@ -342,9 +342,7 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
metrics.NewStreamClientInterceptor(), metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(), tracing_grpc.NewStreamClientInterceptor(),
), ),
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()))
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
} }
// ErrAlreadySyncing is returned when a service synchronization has already // ErrAlreadySyncing is returned when a service synchronization has already

View file

@ -59,8 +59,6 @@ func FlagAndStatus(status string) error {
return fmt.Errorf("clock_gettime: %w", err) return fmt.Errorf("clock_gettime: %w", err)
} }
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10) status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
status += "\nSTATUS=RELOADING"
return Send(status)
} }
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1") status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
return Send(status) return Send(status)