forked from TrueCloudLab/frostfs-node
Compare commits
10 commits
5afea62ec0
...
989336df37
Author | SHA1 | Date | |
---|---|---|---|
989336df37 | |||
bc8d79ddf9 | |||
29708b78d7 | |||
b9284604d9 | |||
65a4320c75 | |||
9a260c2e64 | |||
6f798b9c4b | |||
e515dd4582 | |||
8b6ec57c61 | |||
ed13387c0e |
37 changed files with 404 additions and 95 deletions
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.22 AS builder
|
FROM golang:1.23 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.22
|
FROM golang:1.23
|
||||||
|
|
||||||
WORKDIR /tmp
|
WORKDIR /tmp
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.22 AS builder
|
FROM golang:1.23 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.22 AS builder
|
FROM golang:1.23 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.22 AS builder
|
FROM golang:1.23 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -85,6 +85,57 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SearchObjectsRes groups the resulting values of SearchObjects operation.
|
||||||
|
type ContainerListStreamRes struct {
|
||||||
|
ids []cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// IDList returns identifiers of the matched objects.
|
||||||
|
func (x ContainerListStreamRes) IDList() []cid.ID {
|
||||||
|
return x.ids
|
||||||
|
}
|
||||||
|
|
||||||
|
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
|
||||||
|
cliPrm := &client.PrmContainerListStream{
|
||||||
|
XHeaders: prm.XHeaders,
|
||||||
|
Account: prm.Account,
|
||||||
|
Session: prm.Session,
|
||||||
|
}
|
||||||
|
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||||
|
if err != nil {
|
||||||
|
return res, fmt.Errorf("init container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]cid.ID, 10)
|
||||||
|
var list []cid.ID
|
||||||
|
var n int
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
for {
|
||||||
|
n, ok = rdr.Read(buf)
|
||||||
|
for i := range n {
|
||||||
|
list = append(list, buf[i])
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = rdr.Close()
|
||||||
|
if err != nil {
|
||||||
|
return res, fmt.Errorf("read container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(list, func(i, j int) bool {
|
||||||
|
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
||||||
|
return strings.Compare(lhs, rhs) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
res.ids = list
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -58,6 +58,7 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
|
||||||
GRPCDialOptions: []grpc.DialOption{
|
GRPCDialOptions: []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
||||||
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
||||||
|
|
91
cmd/frostfs-cli/modules/container/list_stream.go
Normal file
91
cmd/frostfs-cli/modules/container/list_stream.go
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var listContainersStreamCmd = &cobra.Command{
|
||||||
|
Use: "list-stream",
|
||||||
|
Short: "List all created containers",
|
||||||
|
Long: "List all created containers",
|
||||||
|
Run: func(cmd *cobra.Command, _ []string) {
|
||||||
|
var idUser user.ID
|
||||||
|
|
||||||
|
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
|
||||||
|
if flagVarListContainerOwner == "" && generateKey {
|
||||||
|
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
key := key.GetOrGenerate(cmd)
|
||||||
|
|
||||||
|
if flagVarListContainerOwner == "" {
|
||||||
|
user.IDFromKey(&idUser, key.PublicKey)
|
||||||
|
} else {
|
||||||
|
err := idUser.DecodeString(flagVarListContainerOwner)
|
||||||
|
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
|
||||||
|
|
||||||
|
var prm internalclient.ListContainersPrm
|
||||||
|
prm.SetClient(cli)
|
||||||
|
prm.Account = idUser
|
||||||
|
|
||||||
|
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
|
||||||
|
prmGet := internalclient.GetContainerPrm{
|
||||||
|
Client: cli,
|
||||||
|
}
|
||||||
|
|
||||||
|
containerIDs := res.IDList()
|
||||||
|
for _, cnrID := range containerIDs {
|
||||||
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
|
cmd.Println(cnrID.String())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
prmGet.ClientParams.ContainerID = &cnrID
|
||||||
|
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := res.Container()
|
||||||
|
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cmd.Println(cnrID.String())
|
||||||
|
|
||||||
|
if flagVarListPrintAttr {
|
||||||
|
cnr.IterateUserAttributes(func(key, val string) {
|
||||||
|
cmd.Printf(" %s: %s\n", key, val)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func initContainerListStreamContainersCmd() {
|
||||||
|
commonflags.Init(listContainersStreamCmd)
|
||||||
|
|
||||||
|
flags := listContainersStreamCmd.Flags()
|
||||||
|
|
||||||
|
flags.StringVar(&flagVarListName, flagListName, "",
|
||||||
|
"List containers by the attribute name",
|
||||||
|
)
|
||||||
|
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
|
||||||
|
"Owner of containers (omit to use owner from private key)",
|
||||||
|
)
|
||||||
|
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
|
||||||
|
"Request and print attributes of each container",
|
||||||
|
)
|
||||||
|
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ var Cmd = &cobra.Command{
|
||||||
func init() {
|
func init() {
|
||||||
containerChildCommand := []*cobra.Command{
|
containerChildCommand := []*cobra.Command{
|
||||||
listContainersCmd,
|
listContainersCmd,
|
||||||
|
listContainersStreamCmd,
|
||||||
createContainerCmd,
|
createContainerCmd,
|
||||||
deleteContainerCmd,
|
deleteContainerCmd,
|
||||||
listContainerObjectsCmd,
|
listContainerObjectsCmd,
|
||||||
|
@ -32,6 +33,7 @@ func init() {
|
||||||
Cmd.AddCommand(containerChildCommand...)
|
Cmd.AddCommand(containerChildCommand...)
|
||||||
|
|
||||||
initContainerListContainersCmd()
|
initContainerListContainersCmd()
|
||||||
|
initContainerListStreamContainersCmd()
|
||||||
initContainerCreateCmd()
|
initContainerCreateCmd()
|
||||||
initContainerDeleteCmd()
|
initContainerDeleteCmd()
|
||||||
initContainerListObjectsCmd()
|
initContainerListObjectsCmd()
|
||||||
|
|
|
@ -30,8 +30,6 @@ func initAddCmd() {
|
||||||
ff := addCmd.Flags()
|
ff := addCmd.Flags()
|
||||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||||
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func add(cmd *cobra.Command, _ []string) {
|
func add(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -36,7 +36,6 @@ func initAddByPathCmd() {
|
||||||
ff.String(pathFlagKey, "", "Path to a node")
|
ff.String(pathFlagKey, "", "Path to a node")
|
||||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||||
|
@ -20,7 +21,13 @@ import (
|
||||||
// after making Tree API public.
|
// after making Tree API public.
|
||||||
func _client() (tree.TreeServiceClient, error) {
|
func _client() (tree.TreeServiceClient, error) {
|
||||||
var netAddr network.Address
|
var netAddr network.Address
|
||||||
err := netAddr.FromString(viper.GetString(commonflags.RPC))
|
|
||||||
|
rpcEndpoint := viper.GetString(commonflags.RPC)
|
||||||
|
if rpcEndpoint == "" {
|
||||||
|
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := netAddr.FromString(rpcEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -34,6 +41,7 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
||||||
|
|
|
@ -36,8 +36,6 @@ func initGetByPathCmd() {
|
||||||
ff.String(pathFlagKey, "", "Path to a node")
|
ff.String(pathFlagKey, "", "Path to a node")
|
||||||
|
|
||||||
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getByPath(cmd *cobra.Command, _ []string) {
|
func getByPath(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -30,8 +30,6 @@ func initGetOpLogCmd() {
|
||||||
ff := getOpLogCmd.Flags()
|
ff := getOpLogCmd.Flags()
|
||||||
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
||||||
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOpLog(cmd *cobra.Command, _ []string) {
|
func getOpLog(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -20,8 +20,6 @@ var healthcheckCmd = &cobra.Command{
|
||||||
|
|
||||||
func initHealthcheckCmd() {
|
func initHealthcheckCmd() {
|
||||||
commonflags.Init(healthcheckCmd)
|
commonflags.Init(healthcheckCmd)
|
||||||
ff := healthcheckCmd.Flags()
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func healthcheck(cmd *cobra.Command, _ []string) {
|
func healthcheck(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -26,8 +26,6 @@ func initListCmd() {
|
||||||
ff := listCmd.Flags()
|
ff := listCmd.Flags()
|
||||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func list(cmd *cobra.Command, _ []string) {
|
func list(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -33,8 +33,6 @@ func initMoveCmd() {
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func move(cmd *cobra.Command, _ []string) {
|
func move(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -29,8 +29,6 @@ func initRemoveCmd() {
|
||||||
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func remove(cmd *cobra.Command, _ []string) {
|
func remove(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -34,8 +34,6 @@ func initGetSubtreeCmd() {
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
||||||
|
|
||||||
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSubTree(cmd *cobra.Command, _ []string) {
|
func getSubTree(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -13,9 +13,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func initAccountingService(ctx context.Context, c *cfg) {
|
func initAccountingService(ctx context.Context, c *cfg) {
|
||||||
if c.cfgMorph.client == nil {
|
c.initMorphComponents(ctx)
|
||||||
initMorphComponents(ctx, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
|
@ -575,6 +575,9 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgMorph struct {
|
type cfgMorph struct {
|
||||||
|
initialized bool
|
||||||
|
guard sync.Mutex
|
||||||
|
|
||||||
client *client.Client
|
client *client.Client
|
||||||
|
|
||||||
notaryEnabled bool
|
notaryEnabled bool
|
||||||
|
@ -1455,10 +1458,7 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||||
|
|
||||||
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
||||||
return container.NewInfoProvider(func() (container.Source, error) {
|
return container.NewInfoProvider(func() (container.Source, error) {
|
||||||
// threadsafe: called on init or on sighup when morph initialized
|
c.initMorphComponents(ctx)
|
||||||
if c.cfgMorph.client == nil {
|
|
||||||
initMorphComponents(ctx, c)
|
|
||||||
}
|
|
||||||
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -28,7 +28,12 @@ const (
|
||||||
notaryDepositRetriesAmount = 300
|
notaryDepositRetriesAmount = 300
|
||||||
)
|
)
|
||||||
|
|
||||||
func initMorphComponents(ctx context.Context, c *cfg) {
|
func (c *cfg) initMorphComponents(ctx context.Context) {
|
||||||
|
c.cfgMorph.guard.Lock()
|
||||||
|
defer c.cfgMorph.guard.Unlock()
|
||||||
|
if c.cfgMorph.initialized {
|
||||||
|
return
|
||||||
|
}
|
||||||
initMorphClient(ctx, c)
|
initMorphClient(ctx, c)
|
||||||
|
|
||||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||||
|
@ -70,6 +75,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
||||||
|
|
||||||
c.netMapSource = netmapSource
|
c.netMapSource = netmapSource
|
||||||
c.cfgNetmap.wrapper = wrap
|
c.cfgNetmap.wrapper = wrap
|
||||||
|
c.cfgMorph.initialized = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func initMorphClient(ctx context.Context, c *cfg) {
|
func initMorphClient(ctx context.Context, c *cfg) {
|
||||||
|
|
|
@ -143,9 +143,7 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
||||||
parseAttributes(c)
|
parseAttributes(c)
|
||||||
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
||||||
|
|
||||||
if c.cfgMorph.client == nil {
|
c.initMorphComponents(ctx)
|
||||||
initMorphComponents(ctx, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
initNetmapState(c)
|
initNetmapState(c)
|
||||||
|
|
||||||
|
|
1
pkg/network/cache/multi.go
vendored
1
pkg/network/cache/multi.go
vendored
|
@ -70,6 +70,7 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
}
|
}
|
||||||
|
|
||||||
prmDial := client.PrmDial{
|
prmDial := client.PrmDial{
|
||||||
|
|
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerStreamerV2 struct {
|
||||||
|
containerGRPC.ContainerService_ListStreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||||
|
return s.ContainerService_ListStreamServer.Send(
|
||||||
|
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||||
|
// to gRPC stream.
|
||||||
|
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||||
|
listReq := new(container.ListStreamRequest)
|
||||||
|
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||||
|
ContainerService_ListStreamServer: gStream,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps := map[string]string{
|
||||||
|
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||||
|
nativeschema.PropertyKeyActorRole: role,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||||
|
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||||
|
}
|
||||||
|
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
request := aperequest.NewRequest(
|
||||||
|
nativeschema.MethodListContainers,
|
||||||
|
aperequest.NewResource(
|
||||||
|
resourceName(namespace, ""),
|
||||||
|
make(map[string]string),
|
||||||
|
),
|
||||||
|
reqProps,
|
||||||
|
)
|
||||||
|
|
||||||
|
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get group ids: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Policy contract keeps group related chains as namespace-group pair.
|
||||||
|
for i := range groups {
|
||||||
|
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||||
|
rt.User = &policyengine.Target{
|
||||||
|
Type: policyengine.User,
|
||||||
|
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||||
|
}
|
||||||
|
rt.Groups = make([]policyengine.Target, len(groups))
|
||||||
|
for i := range groups {
|
||||||
|
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if found && s == apechain.Allow {
|
||||||
|
return ac.next.ListStream(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
return apeErr(nativeschema.MethodListContainers, s)
|
||||||
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
||||||
return &container.ListResponse{}, nil
|
return &container.ListResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||||
|
s.calls["ListStream"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||||
s.calls["Put"]++
|
s.calls["Put"]++
|
||||||
return &container.PutResponse{}, nil
|
return &container.PutResponse{}, nil
|
||||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListStream implements Server.
|
||||||
|
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := a.next.ListStream(req, stream)
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||||
|
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Put implements Server.
|
// Put implements Server.
|
||||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
res, err := a.next.Put(ctx, req)
|
res, err := a.next.Put(ctx, req)
|
||||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
||||||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||||
|
ListStream(*container.ListStreamRequest, ListStream) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
|
@ -93,3 +94,18 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
s.respSvc.SetMeta(resp)
|
s.respSvc.SetMeta(resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := s.exec.ListStream(req, stream)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
respBody := new(container.ListStreamResponseBody)
|
||||||
|
r.SetBody(respBody)
|
||||||
|
|
||||||
|
s.respSvc.SetMeta(r)
|
||||||
|
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
|
@ -201,3 +201,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
|
body := req.GetBody()
|
||||||
|
idV2 := body.GetOwnerID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return errMissingUserID
|
||||||
|
}
|
||||||
|
|
||||||
|
var id user.ID
|
||||||
|
|
||||||
|
err := id.ReadFromV2(*idV2)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(&id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
|
for i := range cnrs {
|
||||||
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
resBody := new(container.ListStreamResponseBody)
|
||||||
|
resBody.SetContainerIDs(cidList)
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is an interface of the FrostFS API Container service server.
|
// Server is an interface of the FrostFS API Container service server.
|
||||||
|
@ -12,4 +13,11 @@ type Server interface {
|
||||||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||||
|
ListStream(*container.ListStreamRequest, ListStream) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||||
|
type ListStream interface {
|
||||||
|
util.ServerStream
|
||||||
|
Send(*container.ListStreamResponse) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
|
resp := new(container.ListStreamResponse)
|
||||||
|
_ = s.sigSvc.SignResponse(resp, err)
|
||||||
|
return stream.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := &listStreamSigner{
|
||||||
|
ListStream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}
|
||||||
|
err := s.svc.ListStream(req, ss)
|
||||||
|
if err != nil || !ss.nonEmptyResp {
|
||||||
|
return ss.send(new(container.ListStreamResponse), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type listStreamSigner struct {
|
||||||
|
ListStream
|
||||||
|
sigSvc *util.SignService
|
||||||
|
|
||||||
|
nonEmptyResp bool // set on first Send call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.nonEmptyResp = true
|
||||||
|
return s.send(resp, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||||
|
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.ListStream.Send(resp)
|
||||||
|
}
|
||||||
|
|
|
@ -103,6 +103,7 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !netAddr.IsTLSEnabled() {
|
if !netAddr.IsTLSEnabled() {
|
||||||
|
|
|
@ -12,10 +12,24 @@ import (
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
||||||
|
|
||||||
|
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
|
||||||
|
var resp *Resp
|
||||||
|
var outErr error
|
||||||
|
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = callback(c, ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
|
}
|
||||||
|
|
||||||
// forEachNode executes callback for each node in the container until true is returned.
|
// forEachNode executes callback for each node in the container until true is returned.
|
||||||
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
||||||
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
||||||
|
|
|
@ -122,16 +122,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *AddResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.Add(ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
|
@ -174,16 +165,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *AddByPathResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.AddByPath(ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := protoToMeta(b.GetMeta())
|
meta := protoToMeta(b.GetMeta())
|
||||||
|
@ -238,16 +220,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *RemoveResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.Remove(ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.GetNodeId() == pilorama.RootID {
|
if b.GetNodeId() == pilorama.RootID {
|
||||||
|
@ -291,16 +264,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *MoveResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.Move(ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.GetNodeId() == pilorama.RootID {
|
if b.GetNodeId() == pilorama.RootID {
|
||||||
|
@ -343,16 +307,7 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *GetNodeByPathResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.GetNodeByPath(ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attr := b.GetPathAttribute()
|
attr := b.GetPathAttribute()
|
||||||
|
@ -763,16 +718,7 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
var resp *TreeListResponse
|
return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList)
|
||||||
var outErr error
|
|
||||||
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = c.TreeList(ctx, req)
|
|
||||||
return outErr == nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ids, err := s.forest.TreeList(ctx, cid)
|
ids, err := s.forest.TreeList(ctx, cid)
|
||||||
|
|
|
@ -342,7 +342,9 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing_grpc.NewStreamClientInterceptor(),
|
tracing_grpc.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||||
|
|
|
@ -59,6 +59,8 @@ func FlagAndStatus(status string) error {
|
||||||
return fmt.Errorf("clock_gettime: %w", err)
|
return fmt.Errorf("clock_gettime: %w", err)
|
||||||
}
|
}
|
||||||
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
|
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
|
||||||
|
status += "\nSTATUS=RELOADING"
|
||||||
|
return Send(status)
|
||||||
}
|
}
|
||||||
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
||||||
return Send(status)
|
return Send(status)
|
||||||
|
|
Loading…
Reference in a new issue