forked from TrueCloudLab/frostfs-node
Compare commits
No commits in common. "989336df371ba6df2d2daae4f16bc27d83d20c6a" and "5afea62ec0c29b8b3422412e4cd8a5452785e6fd" have entirely different histories.
989336df37
...
5afea62ec0
37 changed files with 95 additions and 404 deletions
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.23 AS builder
|
FROM golang:1.22 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.23
|
FROM golang:1.22
|
||||||
|
|
||||||
WORKDIR /tmp
|
WORKDIR /tmp
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.23 AS builder
|
FROM golang:1.22 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.23 AS builder
|
FROM golang:1.22 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
FROM golang:1.23 AS builder
|
FROM golang:1.22 AS builder
|
||||||
ARG BUILD=now
|
ARG BUILD=now
|
||||||
ARG VERSION=dev
|
ARG VERSION=dev
|
||||||
ARG REPO=repository
|
ARG REPO=repository
|
||||||
|
|
|
@ -85,57 +85,6 @@ func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchObjectsRes groups the resulting values of SearchObjects operation.
|
|
||||||
type ContainerListStreamRes struct {
|
|
||||||
ids []cid.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
// IDList returns identifiers of the matched objects.
|
|
||||||
func (x ContainerListStreamRes) IDList() []cid.ID {
|
|
||||||
return x.ids
|
|
||||||
}
|
|
||||||
|
|
||||||
func ListContainersStream(ctx context.Context, prm ListContainersPrm) (res ContainerListStreamRes, err error) {
|
|
||||||
cliPrm := &client.PrmContainerListStream{
|
|
||||||
XHeaders: prm.XHeaders,
|
|
||||||
Account: prm.Account,
|
|
||||||
Session: prm.Session,
|
|
||||||
}
|
|
||||||
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
|
||||||
if err != nil {
|
|
||||||
return res, fmt.Errorf("init container list: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := make([]cid.ID, 10)
|
|
||||||
var list []cid.ID
|
|
||||||
var n int
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
for {
|
|
||||||
n, ok = rdr.Read(buf)
|
|
||||||
for i := range n {
|
|
||||||
list = append(list, buf[i])
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = rdr.Close()
|
|
||||||
if err != nil {
|
|
||||||
return res, fmt.Errorf("read container list: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Slice(list, func(i, j int) bool {
|
|
||||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
|
||||||
return strings.Compare(lhs, rhs) < 0
|
|
||||||
})
|
|
||||||
|
|
||||||
res.ids = list
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -58,7 +58,6 @@ func GetSDKClient(ctx context.Context, cmd *cobra.Command, key *ecdsa.PrivateKey
|
||||||
GRPCDialOptions: []grpc.DialOption{
|
GRPCDialOptions: []grpc.DialOption{
|
||||||
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
grpc.WithChainUnaryInterceptor(tracing.NewUnaryClientInteceptor()),
|
||||||
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
grpc.WithChainStreamInterceptor(tracing.NewStreamClientInterceptor()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
if timeout := viper.GetDuration(commonflags.Timeout); timeout > 0 {
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
package container
|
|
||||||
|
|
||||||
import (
|
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
)
|
|
||||||
|
|
||||||
var listContainersStreamCmd = &cobra.Command{
|
|
||||||
Use: "list-stream",
|
|
||||||
Short: "List all created containers",
|
|
||||||
Long: "List all created containers",
|
|
||||||
Run: func(cmd *cobra.Command, _ []string) {
|
|
||||||
var idUser user.ID
|
|
||||||
|
|
||||||
generateKey, _ := cmd.Flags().GetBool(commonflags.GenerateKey)
|
|
||||||
if flagVarListContainerOwner == "" && generateKey {
|
|
||||||
cmd.PrintErrln("WARN: using -g without --owner - output will be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
key := key.GetOrGenerate(cmd)
|
|
||||||
|
|
||||||
if flagVarListContainerOwner == "" {
|
|
||||||
user.IDFromKey(&idUser, key.PublicKey)
|
|
||||||
} else {
|
|
||||||
err := idUser.DecodeString(flagVarListContainerOwner)
|
|
||||||
commonCmd.ExitOnErr(cmd, "invalid user ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
|
|
||||||
|
|
||||||
var prm internalclient.ListContainersPrm
|
|
||||||
prm.SetClient(cli)
|
|
||||||
prm.Account = idUser
|
|
||||||
|
|
||||||
res, err := internalclient.ListContainersStream(cmd.Context(), prm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
prmGet := internalclient.GetContainerPrm{
|
|
||||||
Client: cli,
|
|
||||||
}
|
|
||||||
|
|
||||||
containerIDs := res.IDList()
|
|
||||||
for _, cnrID := range containerIDs {
|
|
||||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
|
||||||
cmd.Println(cnrID.String())
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
prmGet.ClientParams.ContainerID = &cnrID
|
|
||||||
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
|
||||||
if err != nil {
|
|
||||||
cmd.Printf(" failed to read attributes: %v\n", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
cnr := res.Container()
|
|
||||||
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cmd.Println(cnrID.String())
|
|
||||||
|
|
||||||
if flagVarListPrintAttr {
|
|
||||||
cnr.IterateUserAttributes(func(key, val string) {
|
|
||||||
cmd.Printf(" %s: %s\n", key, val)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func initContainerListStreamContainersCmd() {
|
|
||||||
commonflags.Init(listContainersStreamCmd)
|
|
||||||
|
|
||||||
flags := listContainersStreamCmd.Flags()
|
|
||||||
|
|
||||||
flags.StringVar(&flagVarListName, flagListName, "",
|
|
||||||
"List containers by the attribute name",
|
|
||||||
)
|
|
||||||
flags.StringVar(&flagVarListContainerOwner, flagListContainerOwner, "",
|
|
||||||
"Owner of containers (omit to use owner from private key)",
|
|
||||||
)
|
|
||||||
flags.BoolVar(&flagVarListPrintAttr, flagListPrintAttr, false,
|
|
||||||
"Request and print attributes of each container",
|
|
||||||
)
|
|
||||||
flags.Lookup(commonflags.GenerateKey).Usage = generateKeyContainerUsage
|
|
||||||
}
|
|
|
@ -21,7 +21,6 @@ var Cmd = &cobra.Command{
|
||||||
func init() {
|
func init() {
|
||||||
containerChildCommand := []*cobra.Command{
|
containerChildCommand := []*cobra.Command{
|
||||||
listContainersCmd,
|
listContainersCmd,
|
||||||
listContainersStreamCmd,
|
|
||||||
createContainerCmd,
|
createContainerCmd,
|
||||||
deleteContainerCmd,
|
deleteContainerCmd,
|
||||||
listContainerObjectsCmd,
|
listContainerObjectsCmd,
|
||||||
|
@ -33,7 +32,6 @@ func init() {
|
||||||
Cmd.AddCommand(containerChildCommand...)
|
Cmd.AddCommand(containerChildCommand...)
|
||||||
|
|
||||||
initContainerListContainersCmd()
|
initContainerListContainersCmd()
|
||||||
initContainerListStreamContainersCmd()
|
|
||||||
initContainerCreateCmd()
|
initContainerCreateCmd()
|
||||||
initContainerDeleteCmd()
|
initContainerDeleteCmd()
|
||||||
initContainerListObjectsCmd()
|
initContainerListObjectsCmd()
|
||||||
|
|
|
@ -30,6 +30,8 @@ func initAddCmd() {
|
||||||
ff := addCmd.Flags()
|
ff := addCmd.Flags()
|
||||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||||
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
ff.Uint64(parentIDFlagKey, 0, "Parent node ID")
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func add(cmd *cobra.Command, _ []string) {
|
func add(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -36,6 +36,7 @@ func initAddByPathCmd() {
|
||||||
ff.String(pathFlagKey, "", "Path to a node")
|
ff.String(pathFlagKey, "", "Path to a node")
|
||||||
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
ff.StringSlice(metaFlagKey, nil, "Meta pairs in the form of Key1=[0x]Value1,Key2=[0x]Value2")
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
_ = cobra.MarkFlagRequired(ff, pathFlagKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||||
|
@ -21,13 +20,7 @@ import (
|
||||||
// after making Tree API public.
|
// after making Tree API public.
|
||||||
func _client() (tree.TreeServiceClient, error) {
|
func _client() (tree.TreeServiceClient, error) {
|
||||||
var netAddr network.Address
|
var netAddr network.Address
|
||||||
|
err := netAddr.FromString(viper.GetString(commonflags.RPC))
|
||||||
rpcEndpoint := viper.GetString(commonflags.RPC)
|
|
||||||
if rpcEndpoint == "" {
|
|
||||||
return nil, fmt.Errorf("%s is not defined", commonflags.RPC)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := netAddr.FromString(rpcEndpoint)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -41,7 +34,6 @@ func _client() (tree.TreeServiceClient, error) {
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
if !strings.HasPrefix(netAddr.URIAddr(), "grpcs:") {
|
||||||
|
|
|
@ -36,6 +36,8 @@ func initGetByPathCmd() {
|
||||||
ff.String(pathFlagKey, "", "Path to a node")
|
ff.String(pathFlagKey, "", "Path to a node")
|
||||||
|
|
||||||
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
ff.Bool(latestOnlyFlagKey, false, "Look only for the latest version of a node")
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getByPath(cmd *cobra.Command, _ []string) {
|
func getByPath(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -30,6 +30,8 @@ func initGetOpLogCmd() {
|
||||||
ff := getOpLogCmd.Flags()
|
ff := getOpLogCmd.Flags()
|
||||||
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
ff.Uint64(heightFlagKey, 0, "Height to start with")
|
||||||
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
ff.Uint64(countFlagKey, 10, "Logged operations count")
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOpLog(cmd *cobra.Command, _ []string) {
|
func getOpLog(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -20,6 +20,8 @@ var healthcheckCmd = &cobra.Command{
|
||||||
|
|
||||||
func initHealthcheckCmd() {
|
func initHealthcheckCmd() {
|
||||||
commonflags.Init(healthcheckCmd)
|
commonflags.Init(healthcheckCmd)
|
||||||
|
ff := healthcheckCmd.Flags()
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func healthcheck(cmd *cobra.Command, _ []string) {
|
func healthcheck(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -26,6 +26,8 @@ func initListCmd() {
|
||||||
ff := listCmd.Flags()
|
ff := listCmd.Flags()
|
||||||
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
ff.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
_ = listCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func list(cmd *cobra.Command, _ []string) {
|
func list(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -33,6 +33,8 @@ func initMoveCmd() {
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(parentIDFlagKey)
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func move(cmd *cobra.Command, _ []string) {
|
func move(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -29,6 +29,8 @@ func initRemoveCmd() {
|
||||||
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
ff.Uint64(nodeIDFlagKey, 0, "Node ID.")
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(nodeIDFlagKey)
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func remove(cmd *cobra.Command, _ []string) {
|
func remove(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -34,6 +34,8 @@ func initGetSubtreeCmd() {
|
||||||
|
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
_ = getSubtreeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
_ = getSubtreeCmd.MarkFlagRequired(treeIDFlagKey)
|
||||||
|
|
||||||
|
_ = cobra.MarkFlagRequired(ff, commonflags.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSubTree(cmd *cobra.Command, _ []string) {
|
func getSubTree(cmd *cobra.Command, _ []string) {
|
||||||
|
|
|
@ -13,7 +13,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func initAccountingService(ctx context.Context, c *cfg) {
|
func initAccountingService(ctx context.Context, c *cfg) {
|
||||||
c.initMorphComponents(ctx)
|
if c.cfgMorph.client == nil {
|
||||||
|
initMorphComponents(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
|
|
@ -575,9 +575,6 @@ func (c *cfgGRPC) dropConnection(endpoint string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgMorph struct {
|
type cfgMorph struct {
|
||||||
initialized bool
|
|
||||||
guard sync.Mutex
|
|
||||||
|
|
||||||
client *client.Client
|
client *client.Client
|
||||||
|
|
||||||
notaryEnabled bool
|
notaryEnabled bool
|
||||||
|
@ -1458,7 +1455,10 @@ func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||||
|
|
||||||
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
||||||
return container.NewInfoProvider(func() (container.Source, error) {
|
return container.NewInfoProvider(func() (container.Source, error) {
|
||||||
c.initMorphComponents(ctx)
|
// threadsafe: called on init or on sighup when morph initialized
|
||||||
|
if c.cfgMorph.client == nil {
|
||||||
|
initMorphComponents(ctx, c)
|
||||||
|
}
|
||||||
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -28,12 +28,7 @@ const (
|
||||||
notaryDepositRetriesAmount = 300
|
notaryDepositRetriesAmount = 300
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cfg) initMorphComponents(ctx context.Context) {
|
func initMorphComponents(ctx context.Context, c *cfg) {
|
||||||
c.cfgMorph.guard.Lock()
|
|
||||||
defer c.cfgMorph.guard.Unlock()
|
|
||||||
if c.cfgMorph.initialized {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
initMorphClient(ctx, c)
|
initMorphClient(ctx, c)
|
||||||
|
|
||||||
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
||||||
|
@ -75,7 +70,6 @@ func (c *cfg) initMorphComponents(ctx context.Context) {
|
||||||
|
|
||||||
c.netMapSource = netmapSource
|
c.netMapSource = netmapSource
|
||||||
c.cfgNetmap.wrapper = wrap
|
c.cfgNetmap.wrapper = wrap
|
||||||
c.cfgMorph.initialized = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func initMorphClient(ctx context.Context, c *cfg) {
|
func initMorphClient(ctx context.Context, c *cfg) {
|
||||||
|
|
|
@ -143,7 +143,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
||||||
parseAttributes(c)
|
parseAttributes(c)
|
||||||
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
c.cfgNodeInfo.localInfo.SetStatus(netmapSDK.Offline)
|
||||||
|
|
||||||
c.initMorphComponents(ctx)
|
if c.cfgMorph.client == nil {
|
||||||
|
initMorphComponents(ctx, c)
|
||||||
|
}
|
||||||
|
|
||||||
initNetmapState(c)
|
initNetmapState(c)
|
||||||
|
|
||||||
|
|
1
pkg/network/cache/multi.go
vendored
1
pkg/network/cache/multi.go
vendored
|
@ -70,7 +70,6 @@ func (x *multiClient) createForAddress(ctx context.Context, addr network.Address
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
grpc.WithContextDialer(x.opts.DialerSource.GrpcContextDialer()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
prmDial := client.PrmDial{
|
prmDial := client.PrmDial{
|
||||||
|
|
|
@ -80,26 +80,3 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type containerStreamerV2 struct {
|
|
||||||
containerGRPC.ContainerService_ListStreamServer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
|
||||||
return s.ContainerService_ListStreamServer.Send(
|
|
||||||
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
|
||||||
// to gRPC stream.
|
|
||||||
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
|
||||||
listReq := new(container.ListStreamRequest)
|
|
||||||
if err := listReq.FromGRPCMessage(req); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.srv.ListStream(listReq, &containerStreamerV2{
|
|
||||||
ContainerService_ListStreamServer: gStream,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
|
@ -175,79 +175,6 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reqProps := map[string]string{
|
|
||||||
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
|
||||||
nativeschema.PropertyKeyActorRole: role,
|
|
||||||
}
|
|
||||||
|
|
||||||
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if p, ok := peer.FromContext(ctx); ok {
|
|
||||||
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
|
||||||
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not get owner namespace: %w", err)
|
|
||||||
}
|
|
||||||
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
request := aperequest.NewRequest(
|
|
||||||
nativeschema.MethodListContainers,
|
|
||||||
aperequest.NewResource(
|
|
||||||
resourceName(namespace, ""),
|
|
||||||
make(map[string]string),
|
|
||||||
),
|
|
||||||
reqProps,
|
|
||||||
)
|
|
||||||
|
|
||||||
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get group ids: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Policy contract keeps group related chains as namespace-group pair.
|
|
||||||
for i := range groups {
|
|
||||||
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
|
||||||
rt.User = &policyengine.Target{
|
|
||||||
Type: policyengine.User,
|
|
||||||
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
|
||||||
}
|
|
||||||
rt.Groups = make([]policyengine.Target, len(groups))
|
|
||||||
for i := range groups {
|
|
||||||
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if found && s == apechain.Allow {
|
|
||||||
return ac.next.ListStream(req, stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
return apeErr(nativeschema.MethodListContainers, s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
|
@ -1079,11 +1079,6 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
||||||
return &container.ListResponse{}, nil
|
return &container.ListResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
|
||||||
s.calls["ListStream"]++
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||||
s.calls["Put"]++
|
s.calls["Put"]++
|
||||||
return &container.PutResponse{}, nil
|
return &container.PutResponse{}, nil
|
||||||
|
|
|
@ -63,17 +63,6 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListStream implements Server.
|
|
||||||
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
|
||||||
err := a.next.ListStream(req, stream)
|
|
||||||
if !a.enabled.Load() {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
audit.LogRequest(a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
|
||||||
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put implements Server.
|
// Put implements Server.
|
||||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
res, err := a.next.Put(ctx, req)
|
res, err := a.next.Put(ctx, req)
|
||||||
|
|
|
@ -14,7 +14,6 @@ type ServiceExecutor interface {
|
||||||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||||
ListStream(*container.ListStreamRequest, ListStream) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
|
@ -94,18 +93,3 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
s.respSvc.SetMeta(resp)
|
s.respSvc.SetMeta(resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
|
||||||
err := s.exec.ListStream(req, stream)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not execute ListStream request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
r := new(container.ListStreamResponse)
|
|
||||||
respBody := new(container.ListStreamResponseBody)
|
|
||||||
r.SetBody(respBody)
|
|
||||||
|
|
||||||
s.respSvc.SetMeta(r)
|
|
||||||
|
|
||||||
return stream.Send(r)
|
|
||||||
}
|
|
||||||
|
|
|
@ -201,35 +201,3 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *morphExecutor) ListStream(req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
|
||||||
body := req.GetBody()
|
|
||||||
idV2 := body.GetOwnerID()
|
|
||||||
if idV2 == nil {
|
|
||||||
return errMissingUserID
|
|
||||||
}
|
|
||||||
|
|
||||||
var id user.ID
|
|
||||||
|
|
||||||
err := id.ReadFromV2(*idV2)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("invalid user ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cnrs, err := s.rdr.ContainersOf(&id)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cidList := make([]refs.ContainerID, len(cnrs))
|
|
||||||
for i := range cnrs {
|
|
||||||
cnrs[i].WriteToV2(&cidList[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
resBody := new(container.ListStreamResponseBody)
|
|
||||||
resBody.SetContainerIDs(cidList)
|
|
||||||
r := new(container.ListStreamResponse)
|
|
||||||
r.SetBody(resBody)
|
|
||||||
|
|
||||||
return stream.Send(r)
|
|
||||||
}
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is an interface of the FrostFS API Container service server.
|
// Server is an interface of the FrostFS API Container service server.
|
||||||
|
@ -13,11 +12,4 @@ type Server interface {
|
||||||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||||
ListStream(*container.ListStreamRequest, ListStream) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
|
||||||
type ListStream interface {
|
|
||||||
util.ServerStream
|
|
||||||
Send(*container.ListStreamResponse) error
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,40 +56,3 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
|
||||||
resp := new(container.ListStreamResponse)
|
|
||||||
_ = s.sigSvc.SignResponse(resp, err)
|
|
||||||
return stream.Send(resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
ss := &listStreamSigner{
|
|
||||||
ListStream: stream,
|
|
||||||
sigSvc: s.sigSvc,
|
|
||||||
}
|
|
||||||
err := s.svc.ListStream(req, ss)
|
|
||||||
if err != nil || !ss.nonEmptyResp {
|
|
||||||
return ss.send(new(container.ListStreamResponse), err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type listStreamSigner struct {
|
|
||||||
ListStream
|
|
||||||
sigSvc *util.SignService
|
|
||||||
|
|
||||||
nonEmptyResp bool // set on first Send call
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
|
||||||
s.nonEmptyResp = true
|
|
||||||
return s.send(resp, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
|
||||||
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.ListStream.Send(resp)
|
|
||||||
}
|
|
||||||
|
|
|
@ -103,7 +103,6 @@ func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*
|
||||||
tracing.NewStreamClientInterceptor(),
|
tracing.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
grpc.WithContextDialer(c.ds.GrpcContextDialer()),
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !netAddr.IsTLSEnabled() {
|
if !netAddr.IsTLSEnabled() {
|
||||||
|
|
|
@ -12,24 +12,10 @@ import (
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
||||||
|
|
||||||
func relayUnary[Req any, Resp any](ctx context.Context, s *Service, ns []netmapSDK.NodeInfo, req *Req, callback func(TreeServiceClient, context.Context, *Req, ...grpc.CallOption) (*Resp, error)) (*Resp, error) {
|
|
||||||
var resp *Resp
|
|
||||||
var outErr error
|
|
||||||
err := s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
|
||||||
resp, outErr = callback(c, ctx, req)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, outErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// forEachNode executes callback for each node in the container until true is returned.
|
// forEachNode executes callback for each node in the container until true is returned.
|
||||||
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
// Returns errNoSuitableNode if there was no successful attempt to dial any node.
|
||||||
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo, f func(c TreeServiceClient) bool) error {
|
||||||
|
|
|
@ -122,7 +122,16 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Add)
|
var resp *AddResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.Add(ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: len(ns)}
|
||||||
|
@ -165,7 +174,16 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).AddByPath)
|
var resp *AddByPathResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.AddByPath(ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := protoToMeta(b.GetMeta())
|
meta := protoToMeta(b.GetMeta())
|
||||||
|
@ -220,7 +238,16 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Remove)
|
var resp *RemoveResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.Remove(ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.GetNodeId() == pilorama.RootID {
|
if b.GetNodeId() == pilorama.RootID {
|
||||||
|
@ -264,7 +291,16 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).Move)
|
var resp *MoveResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.Move(ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.GetNodeId() == pilorama.RootID {
|
if b.GetNodeId() == pilorama.RootID {
|
||||||
|
@ -307,7 +343,16 @@ func (s *Service) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).GetNodeByPath)
|
var resp *GetNodeByPathResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.GetNodeByPath(ctx, req)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
attr := b.GetPathAttribute()
|
attr := b.GetPathAttribute()
|
||||||
|
@ -718,7 +763,16 @@ func (s *Service) TreeList(ctx context.Context, req *TreeListRequest) (*TreeList
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if pos < 0 {
|
if pos < 0 {
|
||||||
return relayUnary(ctx, s, ns, req, (TreeServiceClient).TreeList)
|
var resp *TreeListResponse
|
||||||
|
var outErr error
|
||||||
|
err = s.forEachNode(ctx, ns, func(c TreeServiceClient) bool {
|
||||||
|
resp, outErr = c.TreeList(ctx, req)
|
||||||
|
return outErr == nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, outErr
|
||||||
}
|
}
|
||||||
|
|
||||||
ids, err := s.forest.TreeList(ctx, cid)
|
ids, err := s.forest.TreeList(ctx, cid)
|
||||||
|
|
|
@ -342,9 +342,7 @@ func (*Service) createConnection(a network.Address) (*grpc.ClientConn, error) {
|
||||||
metrics.NewStreamClientInterceptor(),
|
metrics.NewStreamClientInterceptor(),
|
||||||
tracing_grpc.NewStreamClientInterceptor(),
|
tracing_grpc.NewStreamClientInterceptor(),
|
||||||
),
|
),
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||||
|
|
|
@ -59,8 +59,6 @@ func FlagAndStatus(status string) error {
|
||||||
return fmt.Errorf("clock_gettime: %w", err)
|
return fmt.Errorf("clock_gettime: %w", err)
|
||||||
}
|
}
|
||||||
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
|
status += "\nMONOTONIC_USEC=" + strconv.FormatInt(ts.Nano()/1000, 10)
|
||||||
status += "\nSTATUS=RELOADING"
|
|
||||||
return Send(status)
|
|
||||||
}
|
}
|
||||||
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
status += "\nSTATUS=" + strings.TrimSuffix(status, "=1")
|
||||||
return Send(status)
|
return Send(status)
|
||||||
|
|
Loading…
Reference in a new issue