forked from TrueCloudLab/frostfs-node
Compare commits
10 commits
5afea62ec0
...
989336df37
Author | SHA1 | Date | |
---|---|---|---|
989336df37 | |||
bc8d79ddf9 | |||
29708b78d7 | |||
b9284604d9 | |||
65a4320c75 | |||
9a260c2e64 | |||
6f798b9c4b | |||
e515dd4582 | |||
8b6ec57c61 | |||
ed13387c0e |
37 changed files with 404 additions and 95 deletions
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.22 AS builder
|
||||
FROM golang:1.23 AS builder
|
||||
ARG BUILD=now
|
||||
ARG VERSION=dev
|
||||
ARG REPO=repository
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.22
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /tmp
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.22 AS builder
|
||||
FROM golang:1.23 AS builder
|
||||
ARG BUILD=now
|
||||
ARG VERSION=dev
|
||||
ARG REPO=repository
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.22 AS builder
|
||||
FROM golang:1.23 AS builder
|
||||
ARG BUILD=now
|
||||
ARG VERSION=dev
|
||||
ARG REPO=repository
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.22 AS builder
|
||||
FROM golang:1.23 AS builder
|
||||
ARG BUILD=now
|
||||
ARG VERSION=dev
|
||||
ARG REPO=repository
|
||||
|
|
|
@ -85,6 +85,57 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
|||
return list
|
||||
}
|
||||
|
||||
// SearchObjectsRes groups the resulting values of SearchObjects operation.
|
||||
type ContainerListStreamRes struct {
|
||||
ids []cid.ID
|
||||
}
|
||||
|
||||
// IDList returns identifiers of the matched objects.
|
||||
func (x ContainerListStreamRes) IDList() []cid.ID {
|
||||
return x.ids
|
||||
}
|
||||
|
||||
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
|
||||
cliPrm := &client.PrmContainerListStream{
|
||||
XHeaders: prm.XHeaders,
|
||||
Account: prm.Account,
|
||||
Session: prm.Session,
|
||||
}
|
||||
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("init container list: %w", err)
|
||||
}
|
||||
|
||||
buf := make([]cid.ID, 10)
|
||||
var list []cid.ID
|
||||
var n int
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
n, ok = rdr.Read(buf)
|
||||
for i := range n {
|
||||
list = append(list, buf[i])
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
_, err = rdr.Close()
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("read container list: %w", err)
|
||||
}
|
||||
|
||||
sort.Slice(list, func(i, j int) bool {
|
||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
||||
return strings.Compare(lhs, rhs) < 0
|
||||
})
|
||||
|
||||
res.ids = list
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// PutContainerPrm groups parameters of PutContainer operation.
|
||||
type PutContainerPrm struct {
|
||||
Client *client.Client
|
||||
|
|
|
@ -58,6 +58,7 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
|
|||
GRPCDialOptions: []grpc.DialOption{
|
||||
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
||||
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
},
|
||||
}
|
||||
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
||||
|
|
91
cmd/frostfs-cli/modules/container/list_stream.go
Normal file
91
cmd/frostfs-cli/modules/container/list_stream.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var listContainersStreamCmd = &cobra.Command{
|
||||
Use: "list-stream",
|
||||
Short: "List all created containers",
|
||||
Long: "List all created containers",
|
||||
Run: func(cmd *cobra.Command, _ []string) {
|
||||
var idUser user.ID
|
||||
|
||||
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
|
||||
if flagVarListContainerOwner == "" && generateKey {
|
||||
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
|
||||
}
|
||||
|
||||
key := key.GetOrGenerate(cmd)
|
||||
|
||||
if flagVarListContainerOwner == "" {
|
||||
user.IDFromKey(&idUser, key.PublicKey)
|
||||
} else {
|
||||
err := idUser.DecodeString(flagVarListContainerOwner)
|
||||
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
|
||||
|
||||
var prm internalclient.ListContainersPrm
|
||||
prm.SetClient(cli)
|
||||
prm.Account = idUser
|
||||
|
||||
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
prmGet := internalclient.GetContainerPrm{
|
||||
Client: cli,
|
||||
}
|
||||
|
||||
containerIDs := res.IDList()
|
||||
for _, cnrID := range containerIDs {
|
||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||
cmd.Println(cnrID.String())
|
||||
continue
|
||||
}
|
||||
|
||||
prmGet.ClientParams.ContainerID = &cnrID
|
||||
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||
if err != nil {
|
||||
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
cnr := res.Container()
|
||||
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||
continue
|
||||
}
|
||||
cmd.Println(cnrID.String())
|
||||
|
||||
if flagVarListPrintAttr {
|
||||
cnr.IterateUserAttributes(func(key, val string) {
|
||||
cmd.Printf(" %s: %s\n", key, val)
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func initContainerListStreamContainersCmd() {
|
||||
commonflags.Init(listContainersStreamCmd)
|
||||
|
||||
flags := listContainersStreamCmd.Flags()
|
||||
|
||||
flags.StringVar(&flagVarListName, flagListName, "",
|
||||
"List containers by the attribute name",
|
||||
)
|
||||
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
|
||||
"Owner of containers (omit to use owner from private key)",
|
||||
)
|
||||
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
|
||||
"Request and print attributes of each container",
|
||||
)
|
||||
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
|
||||
}
|
|
@ -21,6 +21,7 @@ var Cmd = &cobra.Command{
|
|||
func init() {
|
||||
containerChildCommand := []*cobra.Command{
|
||||
listContainersCmd,
|
||||
listContainersStreamCmd,
|
||||
createContainerCmd,
|
||||
deleteContainerCmd,
|
||||
listContainerObjectsCmd,
|
||||
|
@ -32,6 +33,7 @@ func init() {
|
|||
Cmd.AddCommand(containerChildCommand...)
|
||||
|
||||
initContainerListContainersCmd()
|
||||
initContainerListStreamContainersCmd()
|
||||
initContainerCreateCmd()
|
||||
initContainerDeleteCmd()
|
||||
initContainerListObjectsCmd()
|
||||
|
|
|
@ -30,8 +30,6 @@ func initAddCmd() {
|
|||
ff := addCmd.Flags()
|
||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func add(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -36,7 +36,6 @@ func initAddByPathCmd() {
|
|||
ff.String(pathFlagKey, "", "Path to a node")
|
||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package tree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
|
@ -20,7 +21,13 @@ import (
|
|||
// after making Tree API public.
|
||||
func _client() (tree.TreeServiceClient, error) {
|
||||
var netAddr network.Address
|
||||
err := netAddr.FromString(viper.GetString(commonflags.RPC))
|
||||
|
||||
rpcEndpoint := viper.GetString(commonflags.RPC)
|
||||
if rpcEndpoint == "" {
|
||||
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
|
||||
}
|
||||
|
||||
err := netAddr.FromString(rpcEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -34,6 +41,7 @@ func _client() (tree.TreeServiceClient, error) {
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
||||
|
|
|
@ -36,8 +36,6 @@ func initGetByPathCmd() {
|
|||
ff.String(pathFlagKey, "", "Path to a node")
|
||||
|
||||
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getByPath(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -30,8 +30,6 @@ func initGetOpLogCmd() {
|
|||
ff := getOpLogCmd.Flags()
|
||||
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
||||
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getOpLog(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -20,8 +20,6 @@ var healthcheckCmd = &cobra.Command{
|
|||
|
||||
func initHealthcheckCmd() {
|
||||
commonflags.Init(healthcheckCmd)
|
||||
ff := healthcheckCmd.Flags()
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func healthcheck(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -26,8 +26,6 @@ func initListCmd() {
|
|||
ff := listCmd.Flags()
|
||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func list(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -33,8 +33,6 @@ func initMoveCmd() {
|
|||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func move(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -29,8 +29,6 @@ func initRemoveCmd() {
|
|||
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
||||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func remove(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -34,8 +34,6 @@ func initGetSubtreeCmd() {
|
|||
|
||||
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
||||
|
||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||
}
|
||||
|
||||
func getSubTree(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -13,9 +13,7 @@ import (
|
|||
)
|
||||
|
||||
func initAccountingService(ctx context.Context, c *cfg) {
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
|
||||
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
||||
fatalOnErr(err)
|
||||
|
|
|
@ -575,6 +575,9 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
|
|||
}
|
||||
|
||||
type cfgMorph struct {
|
||||
initialized bool
|
||||
guard sync.Mutex
|
||||
|
||||
client *client.Client
|
||||
|
||||
notaryEnabled bool
|
||||
|
@ -1455,10 +1458,7 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
|||
|
||||
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
||||
return container.NewInfoProvider(func() (container.Source, error) {
|
||||
// threadsafe: called on init or on sighup when morph initialized
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -28,7 +28,12 @@ const (
|
|||
notaryDepositRetriesAmount = 300
|
||||
)
|
||||
|
||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||
func (c *cfg) initMorphComponents(ctx context.Context) {
|
||||
c.cfgMorph.guard.Lock()
|
||||
defer c.cfgMorph.guard.Unlock()
|
||||
if c.cfgMorph.initialized {
|
||||
return
|
||||
}
|
||||
initMorphClient(ctx, c)
|
||||
|
||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||
|
@ -70,6 +75,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
|
||||
c.netMapSource = netmapSource
|
||||
c.cfgNetmap.wrapper = wrap
|
||||
c.cfgMorph.initialized = true
|
||||
}
|
||||
|
||||
func initMorphClient(ctx context.Context, c *cfg) {
|
||||
|
|
|
@ -143,9 +143,7 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
parseAttributes(c)
|
||||
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
||||
|
||||
if c.cfgMorph.client == nil {
|
||||
initMorphComponents(ctx, c)
|
||||
}
|
||||
c.initMorphComponents(ctx)
|
||||
|
||||
initNetmapState(c)
|
||||
|
||||
|
|
1
pkg/network/cache/multi.go
vendored
1
pkg/network/cache/multi.go
vendored
|
@ -70,6 +70,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
|||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
prmDial := client.PrmDial{
|
||||
|
|
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
|||
|
||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||
}
|
||||
|
||||
type containerStreamerV2 struct {
|
||||
containerGRPC.ContainerService_ListStreamServer
|
||||
}
|
||||
|
||||
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||
return s.ContainerService_ListStreamServer.Send(
|
||||
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||
)
|
||||
}
|
||||
|
||||
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||
// to gRPC stream.
|
||||
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||
listReq := new(container.ListStreamRequest)
|
||||
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||
ContainerService_ListStreamServer: gStream,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||
defer span.End()
|
||||
|
||||
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqProps := map[string]string{
|
||||
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||
nativeschema.PropertyKeyActorRole: role,
|
||||
}
|
||||
|
||||
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||
}
|
||||
}
|
||||
|
||||
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||
}
|
||||
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
request := aperequest.NewRequest(
|
||||
nativeschema.MethodListContainers,
|
||||
aperequest.NewResource(
|
||||
resourceName(namespace, ""),
|
||||
make(map[string]string),
|
||||
),
|
||||
reqProps,
|
||||
)
|
||||
|
||||
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get group ids: %w", err)
|
||||
}
|
||||
|
||||
// Policy contract keeps group related chains as namespace-group pair.
|
||||
for i := range groups {
|
||||
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||
}
|
||||
|
||||
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||
rt.User = &policyengine.Target{
|
||||
Type: policyengine.User,
|
||||
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||
}
|
||||
rt.Groups = make([]policyengine.Target, len(groups))
|
||||
for i := range groups {
|
||||
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||
}
|
||||
|
||||
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if found && s == apechain.Allow {
|
||||
return ac.next.ListStream(req, stream)
|
||||
}
|
||||
|
||||
return apeErr(nativeschema.MethodListContainers, s)
|
||||
}
|
||||
|
||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||
defer span.End()
|
||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
|||
return &container.ListResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||
s.calls["ListStream"]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||
s.calls["Put"]++
|
||||
return &container.PutResponse{}, nil
|
||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
|||
return res, err
|
||||
}
|
||||
|
||||
// ListStream implements Server.
|
||||
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := a.next.ListStream(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Put implements Server.
|
||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||
res, err := a.next.Put(ctx, req)
|
||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
|||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||
ListStream(*container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
type executorSvc struct {
|
||||
|
@ -93,3 +94,18 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
s.respSvc.SetMeta(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
err := s.exec.ListStream(req, stream)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||
}
|
||||
|
||||
r := new(container.ListStreamResponse)
|
||||
respBody := new(container.ListStreamResponseBody)
|
||||
r.SetBody(respBody)
|
||||
|
||||
s.respSvc.SetMeta(r)
|
||||
|
||||
return stream.Send(r)
|
||||
}
|
||||
|
|
|
@ -201,3 +201,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
|||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||
body := req.GetBody()
|
||||
idV2 := body.GetOwnerID()
|
||||
if idV2 == nil {
|
||||
return errMissingUserID
|
||||
}
|
||||
|
||||
var id user.ID
|
||||
|
||||
err := id.ReadFromV2(*idV2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid user ID: %w", err)
|
||||
}
|
||||
|
||||
cnrs, err := s.rdr.ContainersOf(&id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cidList := make([]refs.ContainerID, len(cnrs))
|
||||
for i := range cnrs {
|
||||
cnrs[i].WriteToV2(&cidList[i])
|
||||
}
|
||||
|
||||
resBody := new(container.ListStreamResponseBody)
|
||||
resBody.SetContainerIDs(cidList)
|
||||
r := new(container.ListStreamResponse)
|
||||
r.SetBody(resBody)
|
||||
|
||||
return stream.Send(r)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
)
|
||||
|
||||
// Server is an interface of the FrostFS API Container service server.
|
||||
|
@ -12,4 +13,11 @@ type Server interface {
|
|||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||
ListStream(*container.ListStreamRequest, ListStream) error
|
||||
}
|
||||
|
||||
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||
type ListStream interface {
|
||||
util.ServerStream
|
||||
Send(*container.ListStreamResponse) error
|
||||
}
|
||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
|||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(container.ListStreamResponse)
|
||||
_ = s.sigSvc.SignResponse(resp, err)
|
||||
return stream.Send(resp)
|
||||
}
|
||||
|
||||
ss := &listStreamSigner{
|
||||
ListStream: stream,
|
||||
sigSvc: s.sigSvc,
|
||||
}
|
||||
err := s.svc.ListStream(req, ss)
|
||||
if err != nil || !ss.nonEmptyResp {
|
||||
return ss.send(new(container.ListStreamResponse), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type listStreamSigner struct {
|
||||
ListStream
|
||||
sigSvc *util.SignService
|
||||
|
||||
nonEmptyResp bool // set on first Send call
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||
s.nonEmptyResp = true
|
||||
return s.send(resp, nil)
|
||||
}
|
||||
|
||||
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.ListStream.Send(resp)
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
|||
tracing.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
}
|
||||
|
||||
if !netAddr.IsTLSEnabled() {
|
||||
|
|
|
@ -12,10 +12,24 @@ import (
|
|||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
||||
|
||||
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
|
||||
var resp *Resp
|
||||
var outErr error
|
||||
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = callback(c, ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
}
|
||||
|
||||
// forEachNode executes callback for each node in the container until true is returned.
|
||||
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
||||
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
||||
|
|
|
@ -122,16 +122,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *AddResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Add(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add)
|
||||
}
|
||||
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||
|
@ -174,16 +165,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *AddByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.AddByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath)
|
||||
}
|
||||
|
||||
meta := protoToMeta(b.GetMeta())
|
||||
|
@ -238,16 +220,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *RemoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Remove(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove)
|
||||
}
|
||||
|
||||
if b.GetNodeId() == pilorama.RootID {
|
||||
|
@ -291,16 +264,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *MoveResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.Move(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move)
|
||||
}
|
||||
|
||||
if b.GetNodeId() == pilorama.RootID {
|
||||
|
@ -343,16 +307,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *GetNodeByPathResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.GetNodeByPath(ctx, req)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath)
|
||||
}
|
||||
|
||||
attr := b.GetPathAttribute()
|
||||
|
@ -763,16 +718,7 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
|
|||
return nil, err
|
||||
}
|
||||
if pos < 0 {
|
||||
var resp *TreeListResponse
|
||||
var outErr error
|
||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||
resp, outErr = c.TreeList(ctx, req)
|
||||
return outErr == nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, outErr
|
||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList)
|
||||
}
|
||||
|
||||
ids, err := s.forest.TreeList(ctx, cid)
|
||||
|
|
|
@ -342,7 +342,9 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
|||
metrics.NewStreamClientInterceptor(),
|
||||
tracing_grpc.NewStreamClientInterceptor(),
|
||||
),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||
)
|
||||
}
|
||||
|
||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||
|
|
|
@ -59,6 +59,8 @@ func FlagAndStatus(status string) error {
|
|||
return fmt.Errorf("clock_gettime: %w", err)
|
||||
}
|
||||
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
|
||||
status += "\nSTATUS=RELOADING"
|
||||
return Send(status)
|
||||
}
|
||||
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
||||
return Send(status)
|
||||
|
|
Loading…
Reference in a new issue