forked from TrueCloudLab/frostfs-node
Compare commits
6 commits
7ffc82a066
...
03d5a1d5ad
Author | SHA1 | Date | |
---|---|---|---|
03d5a1d5ad | |||
0e11cbe9dd | |||
d165ac042c | |||
7151c71d51 | |||
91d9dc2676 | |||
7853dbc315 |
44 changed files with 513 additions and 146 deletions
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
|
||||||
|
@ -78,13 +77,31 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
|
||||||
// SortedIDList returns sorted list of identifiers of user's containers.
|
// SortedIDList returns sorted list of identifiers of user's containers.
|
||||||
func (x ListContainersRes) SortedIDList() []cid.ID {
|
func (x ListContainersRes) SortedIDList() []cid.ID {
|
||||||
list := x.cliRes.Containers()
|
list := x.cliRes.Containers()
|
||||||
sort.Slice(list, func(i, j int) bool {
|
slices.SortFunc(list, func(lhs, rhs cid.ID) int {
|
||||||
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString()
|
return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
|
||||||
return strings.Compare(lhs, rhs) < 0
|
|
||||||
})
|
})
|
||||||
return list
|
return list
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ListContainersStream(ctx context.Context, prm ListContainersPrm, printCnr func(id cid.ID) bool) (err error) {
|
||||||
|
cliPrm := &client.PrmContainerListStream{
|
||||||
|
XHeaders: prm.XHeaders,
|
||||||
|
OwnerID: prm.OwnerID,
|
||||||
|
Session: prm.Session,
|
||||||
|
}
|
||||||
|
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("init container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rdr.Iterate(printCnr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("read container list: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// PutContainerPrm groups parameters of PutContainer operation.
|
// PutContainerPrm groups parameters of PutContainer operation.
|
||||||
type PutContainerPrm struct {
|
type PutContainerPrm struct {
|
||||||
Client *client.Client
|
Client *client.Client
|
||||||
|
|
|
@ -6,8 +6,11 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// flags of list command.
|
// flags of list command.
|
||||||
|
@ -51,16 +54,50 @@ var listContainersCmd = &cobra.Command{
|
||||||
|
|
||||||
var prm internalclient.ListContainersPrm
|
var prm internalclient.ListContainersPrm
|
||||||
prm.SetClient(cli)
|
prm.SetClient(cli)
|
||||||
prm.Account = idUser
|
prm.OwnerID = idUser
|
||||||
|
|
||||||
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
||||||
|
|
||||||
prmGet := internalclient.GetContainerPrm{
|
prmGet := internalclient.GetContainerPrm{
|
||||||
Client: cli,
|
Client: cli,
|
||||||
}
|
}
|
||||||
|
var containerIDs []cid.ID
|
||||||
|
|
||||||
|
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
|
||||||
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
|
cmd.Println(id.String())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
prmGet.ClientParams.ContainerID = &id
|
||||||
|
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Printf(" failed to read attributes: %v\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := res.Container()
|
||||||
|
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
cmd.Println(id.String())
|
||||||
|
|
||||||
|
if flagVarListPrintAttr {
|
||||||
|
cnr.IterateUserAttributes(func(key, val string) {
|
||||||
|
cmd.Printf(" %s: %s\n", key, val)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
|
||||||
|
res, err := internalclient.ListContainers(cmd.Context(), prm)
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
containerIDs = res.SortedIDList()
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
containerIDs := res.SortedIDList()
|
|
||||||
for _, cnrID := range containerIDs {
|
for _, cnrID := range containerIDs {
|
||||||
if flagVarListName == "" && !flagVarListPrintAttr {
|
if flagVarListName == "" && !flagVarListPrintAttr {
|
||||||
cmd.Println(cnrID.String())
|
cmd.Println(cnrID.String())
|
||||||
|
|
|
@ -609,6 +609,7 @@ type cfgContainer struct {
|
||||||
parsers map[event.Type]event.NotificationParser
|
parsers map[event.Type]event.NotificationParser
|
||||||
subscribers map[event.Type][]event.Handler
|
subscribers map[event.Type][]event.Handler
|
||||||
workerPool util.WorkerPool // pool for asynchronous handlers
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
||||||
|
containerBatchSize uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgFrostfsID struct {
|
type cfgFrostfsID struct {
|
||||||
|
|
27
cmd/frostfs-node/config/container/container.go
Normal file
27
cmd/frostfs-node/config/container/container.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig
|
||||||
|
|
||||||
|
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
|
||||||
|
const (
|
||||||
|
subsection = "container"
|
||||||
|
listStreamSubsection = "list_stream"
|
||||||
|
|
||||||
|
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
|
||||||
|
ContainerBatchSizeDefault = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
// ContainerBatchSize returns the value of "batch_size" config parameter
|
||||||
|
// from "list_stream" subsection of "container" section.
|
||||||
|
//
|
||||||
|
// Returns ContainerBatchSizeDefault if the value is missing or if
|
||||||
|
// the value is not positive integer.
|
||||||
|
func ContainerBatchSize(c *config.Config) uint32 {
|
||||||
|
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
|
||||||
|
if size == 0 {
|
||||||
|
return ContainerBatchSizeDefault
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
27
cmd/frostfs-node/config/container/container_test.go
Normal file
27
cmd/frostfs-node/config/container/container_test.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package containerconfig_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestContainerSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
empty := configtest.EmptyConfig()
|
||||||
|
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
|
||||||
|
})
|
||||||
|
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
require.Equal(t, uint32(1000), containerconfig.ContainerBatchSize(c))
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
|
||||||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||||
|
@ -47,6 +48,7 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
c.shared.frostfsidClient = frostfsIDSubjectProvider
|
||||||
|
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
|
||||||
|
|
||||||
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
|
||||||
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
|
||||||
|
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
&c.key.PrivateKey,
|
&c.key.PrivateKey,
|
||||||
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
containerService.NewAPEServer(defaultChainRouter, cnrRdr,
|
||||||
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
|
||||||
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc),
|
containerService.NewSplitterService(
|
||||||
|
c.cfgContainer.containerBatchSize, c.respSvc,
|
||||||
|
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
service = containerService.NewAuditService(service, c.log, c.audit)
|
service = containerService.NewAuditService(service, c.log, c.audit)
|
||||||
|
|
|
@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
|
||||||
FROSTFS_REPLICATOR_POOL_SIZE=10
|
FROSTFS_REPLICATOR_POOL_SIZE=10
|
||||||
|
|
||||||
|
# Container service section
|
||||||
|
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=1000
|
||||||
|
|
||||||
# Object service section
|
# Object service section
|
||||||
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
|
||||||
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200
|
||||||
|
|
|
@ -124,6 +124,11 @@
|
||||||
"pool_size": 10,
|
"pool_size": 10,
|
||||||
"put_timeout": "15s"
|
"put_timeout": "15s"
|
||||||
},
|
},
|
||||||
|
"container": {
|
||||||
|
"list_stream": {
|
||||||
|
"batch_size": "1000"
|
||||||
|
}
|
||||||
|
},
|
||||||
"object": {
|
"object": {
|
||||||
"delete": {
|
"delete": {
|
||||||
"tombstone_lifetime": 10
|
"tombstone_lifetime": 10
|
||||||
|
|
|
@ -108,6 +108,10 @@ replicator:
|
||||||
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
put_timeout: 15s # timeout for the Replicator PUT remote operation
|
||||||
pool_size: 10 # maximum amount of concurrent replications
|
pool_size: 10 # maximum amount of concurrent replications
|
||||||
|
|
||||||
|
container:
|
||||||
|
list_stream:
|
||||||
|
batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
|
||||||
|
|
||||||
object:
|
object:
|
||||||
delete:
|
delete:
|
||||||
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -29,7 +29,7 @@ func (c *Client) BalanceOf(id user.ID) (*big.Int, error) {
|
||||||
|
|
||||||
amount, err := client.BigIntFromStackItem(prms[0])
|
amount, err := client.BigIntFromStackItem(prms[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get integer stack item from stack item (%s): %w", balanceOfMethod, err)
|
return nil, fmt.Errorf("get integer stack item from stack item (%s): %w", balanceOfMethod, err)
|
||||||
}
|
}
|
||||||
return amount, nil
|
return amount, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ func (c *Client) Decimals() (uint32, error) {
|
||||||
|
|
||||||
decimals, err := client.IntFromStackItem(prms[0])
|
decimals, err := client.IntFromStackItem(prms[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not get integer stack item from stack item (%s): %w", decimalsMethod, err)
|
return 0, fmt.Errorf("get integer stack item from stack item (%s): %w", decimalsMethod, err)
|
||||||
}
|
}
|
||||||
return uint32(decimals), nil
|
return uint32(decimals), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (c *Client) TransferX(ctx context.Context, p TransferPrm) error {
|
||||||
|
|
||||||
_, err = c.client.Invoke(ctx, prm)
|
_, err = c.client.Invoke(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not invoke method (%s): %w", transferXMethod, err)
|
return fmt.Errorf("invoke method (%s): %w", transferXMethod, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,7 +196,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F
|
||||||
|
|
||||||
txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...)
|
txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InvokeRes{}, fmt.Errorf("could not invoke %s: %w", method, err)
|
return InvokeRes{}, fmt.Errorf("invoke %s: %w", method, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Debug(ctx, logs.ClientNeoClientInvoke,
|
c.logger.Debug(ctx, logs.ClientNeoClientInvoke,
|
||||||
|
@ -509,7 +509,7 @@ func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
|
||||||
|
|
||||||
list, err := c.roleList(noderoles.NeoFSAlphabet)
|
list, err := c.roleList(noderoles.NeoFSAlphabet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err)
|
return nil, fmt.Errorf("get alphabet nodes role list: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return list, nil
|
return list, nil
|
||||||
|
@ -523,7 +523,7 @@ func (c *Client) GetDesignateHash() util.Uint160 {
|
||||||
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
|
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
|
||||||
height, err := c.rpcActor.GetBlockCount()
|
height, err := c.rpcActor.GetBlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't get chain height: %w", err)
|
return nil, fmt.Errorf("get chain height: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.rolemgmt.GetDesignatedByRole(r, height)
|
return c.rolemgmt.GetDesignatedByRole(r, height)
|
||||||
|
|
|
@ -26,7 +26,7 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
|
||||||
cb := func(item stackitem.Item) error {
|
cb := func(item stackitem.Item) error {
|
||||||
rawID, err := client.BytesFromStackItem(item)
|
rawID, err := client.BytesFromStackItem(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get byte array from stack item (%s): %w", containersOfMethod, err)
|
return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var id cid.ID
|
var id cid.ID
|
||||||
|
|
|
@ -78,7 +78,7 @@ func (c *Client) Delete(ctx context.Context, p DeletePrm) (uint32, error) {
|
||||||
|
|
||||||
res, err := c.client.Invoke(ctx, prm)
|
res, err := c.client.Invoke(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not invoke method (%s): %w", deleteMethod, err)
|
return 0, fmt.Errorf("invoke method (%s): %w", deleteMethod, err)
|
||||||
}
|
}
|
||||||
return res.VUB, nil
|
return res.VUB, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
||||||
|
|
||||||
arr, err := client.ArrayFromStackItem(res[0])
|
arr, err := client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get item array of container (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("get item array of container (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(arr) != 2 {
|
if len(arr) != 2 {
|
||||||
|
@ -55,17 +55,17 @@ func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
||||||
|
|
||||||
rawOwner, err := client.BytesFromStackItem(arr[0])
|
rawOwner, err := client.BytesFromStackItem(arr[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("get byte array of container (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
if err := owner.DecodeString(base58.Encode(rawOwner)); err != nil {
|
if err := owner.DecodeString(base58.Encode(rawOwner)); err != nil {
|
||||||
return nil, fmt.Errorf("could not decode container owner id (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("decode container owner id (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
epoch, err := client.BigIntFromStackItem(arr[1])
|
epoch, err := client.BigIntFromStackItem(arr[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &containercore.DelInfo{
|
return &containercore.DelInfo{
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
|
||||||
|
|
||||||
arr, err := client.ArrayFromStackItem(res[0])
|
arr, err := client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get item array of container (%s): %w", getMethod, err)
|
return nil, fmt.Errorf("get item array of container (%s): %w", getMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(arr) != 4 {
|
if len(arr) != 4 {
|
||||||
|
@ -69,29 +69,29 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
|
||||||
|
|
||||||
cnrBytes, err := client.BytesFromStackItem(arr[0])
|
cnrBytes, err := client.BytesFromStackItem(arr[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container (%s): %w", getMethod, err)
|
return nil, fmt.Errorf("get byte array of container (%s): %w", getMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sigBytes, err := client.BytesFromStackItem(arr[1])
|
sigBytes, err := client.BytesFromStackItem(arr[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", getMethod, err)
|
return nil, fmt.Errorf("get byte array of container signature (%s): %w", getMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub, err := client.BytesFromStackItem(arr[2])
|
pub, err := client.BytesFromStackItem(arr[2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of public key (%s): %w", getMethod, err)
|
return nil, fmt.Errorf("get byte array of public key (%s): %w", getMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tokBytes, err := client.BytesFromStackItem(arr[3])
|
tokBytes, err := client.BytesFromStackItem(arr[3])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of session token (%s): %w", getMethod, err)
|
return nil, fmt.Errorf("get byte array of session token (%s): %w", getMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var cnr containercore.Container
|
var cnr containercore.Container
|
||||||
|
|
||||||
if err := cnr.Value.Unmarshal(cnrBytes); err != nil {
|
if err := cnr.Value.Unmarshal(cnrBytes); err != nil {
|
||||||
// use other major version if there any
|
// use other major version if there any
|
||||||
return nil, fmt.Errorf("can't unmarshal container: %w", err)
|
return nil, fmt.Errorf("unmarshal container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tokBytes) > 0 {
|
if len(tokBytes) > 0 {
|
||||||
|
@ -99,7 +99,7 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
|
||||||
|
|
||||||
err = cnr.Session.Unmarshal(tokBytes)
|
err = cnr.Session.Unmarshal(tokBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not unmarshal session token: %w", err)
|
return nil, fmt.Errorf("unmarshal session token: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,14 +34,14 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
|
||||||
|
|
||||||
res, err = client.ArrayFromStackItem(res[0])
|
res, err = client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w", listMethod, err)
|
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cidList := make([]cid.ID, 0, len(res))
|
cidList := make([]cid.ID, 0, len(res))
|
||||||
for i := range res {
|
for i := range res {
|
||||||
rawID, err := client.BytesFromStackItem(res[i])
|
rawID, err := client.BytesFromStackItem(res[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array from stack item (%s): %w", listMethod, err)
|
return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var id cid.ID
|
var id cid.ID
|
||||||
|
|
|
@ -117,7 +117,7 @@ func (c *Client) Put(ctx context.Context, p PutPrm) error {
|
||||||
|
|
||||||
_, err := c.client.Invoke(ctx, prm)
|
_, err := c.client.Invoke(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not invoke method (%s): %w", method, err)
|
return fmt.Errorf("invoke method (%s): %w", method, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func (c *Client) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error)
|
||||||
|
|
||||||
subj, err := frostfsidclient.ParseSubject(structArr)
|
subj, err := frostfsidclient.ParseSubject(structArr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse test invocation result (%s): %w", methodGetSubject, err)
|
return nil, fmt.Errorf("parse test invocation result (%s): %w", methodGetSubject, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return subj, nil
|
return subj, nil
|
||||||
|
@ -54,7 +54,7 @@ func (c *Client) GetSubjectExtended(addr util.Uint160) (*frostfsidclient.Subject
|
||||||
|
|
||||||
subj, err := frostfsidclient.ParseSubjectExtended(structArr)
|
subj, err := frostfsidclient.ParseSubjectExtended(structArr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse test invocation result (%s): %w", methodGetSubject, err)
|
return nil, fmt.Errorf("parse test invocation result (%s): %w", methodGetSubject, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return subj, nil
|
return subj, nil
|
||||||
|
@ -67,7 +67,7 @@ func checkStackItem(res []stackitem.Item) (structArr []stackitem.Item, err error
|
||||||
|
|
||||||
structArr, err = client.ArrayFromStackItem(res[0])
|
structArr, err = client.ArrayFromStackItem(res[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get item array of container (%s): %w", methodGetSubject, err)
|
return nil, fmt.Errorf("get item array of container (%s): %w", methodGetSubject, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (c *Client) Epoch() (uint64, error) {
|
||||||
|
|
||||||
num, err := client.IntFromStackItem(items[0])
|
num, err := client.IntFromStackItem(items[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not get number from stack item (%s): %w", epochMethod, err)
|
return 0, fmt.Errorf("get number from stack item (%s): %w", epochMethod, err)
|
||||||
}
|
}
|
||||||
return uint64(num), nil
|
return uint64(num), nil
|
||||||
}
|
}
|
||||||
|
@ -49,7 +49,7 @@ func (c *Client) LastEpochBlock() (uint32, error) {
|
||||||
|
|
||||||
block, err := client.IntFromStackItem(items[0])
|
block, err := client.IntFromStackItem(items[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not get number from stack item (%s): %w",
|
return 0, fmt.Errorf("get number from stack item (%s): %w",
|
||||||
lastEpochBlockMethod, err)
|
lastEpochBlockMethod, err)
|
||||||
}
|
}
|
||||||
return uint32(block), nil
|
return uint32(block), nil
|
||||||
|
|
|
@ -59,7 +59,7 @@ func irKeysFromStackItem(stack []stackitem.Item, method string) (keys.PublicKeys
|
||||||
|
|
||||||
irs, err := client.ArrayFromStackItem(stack[0])
|
irs, err := client.ArrayFromStackItem(stack[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w", method, err)
|
return nil, fmt.Errorf("get stack item array from stack item (%s): %w", method, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
irKeys := make(keys.PublicKeys, len(irs))
|
irKeys := make(keys.PublicKeys, len(irs))
|
||||||
|
@ -79,7 +79,7 @@ const irNodeFixedPrmNumber = 1
|
||||||
func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) {
|
func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) {
|
||||||
prms, err := client.ArrayFromStackItem(prm)
|
prms, err := client.ArrayFromStackItem(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get stack item array (IRNode): %w", err)
|
return nil, fmt.Errorf("get stack item array (IRNode): %w", err)
|
||||||
} else if ln := len(prms); ln != irNodeFixedPrmNumber {
|
} else if ln := len(prms); ln != irNodeFixedPrmNumber {
|
||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
"unexpected stack item count (IRNode): expected %d, has %d",
|
"unexpected stack item count (IRNode): expected %d, has %d",
|
||||||
|
@ -90,7 +90,7 @@ func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) {
|
||||||
|
|
||||||
byteKey, err := client.BytesFromStackItem(prms[0])
|
byteKey, err := client.BytesFromStackItem(prms[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse bytes from stack item (IRNode): %w", err)
|
return nil, fmt.Errorf("parse bytes from stack item (IRNode): %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return keys.NewPublicKeyFromBytes(byteKey, elliptic.P256())
|
return keys.NewPublicKeyFromBytes(byteKey, elliptic.P256())
|
||||||
|
|
|
@ -16,7 +16,7 @@ func (c *Client) NewEpoch(ctx context.Context, epoch uint64) error {
|
||||||
|
|
||||||
_, err := c.client.Invoke(ctx, prm)
|
_, err := c.client.Invoke(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err)
|
return fmt.Errorf("invoke method (%s): %w", newEpochMethod, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ func (c *Client) NewEpochControl(ctx context.Context, epoch uint64, vub uint32)
|
||||||
|
|
||||||
res, err := c.client.Invoke(ctx, prm)
|
res, err := c.client.Invoke(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err)
|
return 0, fmt.Errorf("invoke method (%s): %w", newEpochMethod, err)
|
||||||
}
|
}
|
||||||
return res.VUB, nil
|
return res.VUB, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ func (c *Client) AddPeer(ctx context.Context, p AddPeerPrm) error {
|
||||||
prm.InvokePrmOptional = p.InvokePrmOptional
|
prm.InvokePrmOptional = p.InvokePrmOptional
|
||||||
|
|
||||||
if _, err := c.client.Invoke(ctx, prm); err != nil {
|
if _, err := c.client.Invoke(ctx, prm); err != nil {
|
||||||
return fmt.Errorf("could not invoke method (%s): %w", method, err)
|
return fmt.Errorf("invoke method (%s): %w", method, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ func (c *Client) NNSHash() (util.Uint160, error) {
|
||||||
func nnsResolveItem(c *rpcclient.WSClient, nnsHash util.Uint160, domain string) (stackitem.Item, error) {
|
func nnsResolveItem(c *rpcclient.WSClient, nnsHash util.Uint160, domain string) (stackitem.Item, error) {
|
||||||
found, err := exists(c, nnsHash, domain)
|
found, err := exists(c, nnsHash, domain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not check presence in NNS contract for %s: %w", domain, err)
|
return nil, fmt.Errorf("check presence in NNS contract for %s: %w", domain, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
|
|
|
@ -58,16 +58,11 @@ const (
|
||||||
defaultNotaryValidTime = 50
|
defaultNotaryValidTime = 50
|
||||||
defaultNotaryRoundTime = 100
|
defaultNotaryRoundTime = 100
|
||||||
|
|
||||||
notaryBalanceOfMethod = "balanceOf"
|
|
||||||
notaryExpirationOfMethod = "expirationOf"
|
|
||||||
setDesignateMethod = "designateAsRole"
|
setDesignateMethod = "designateAsRole"
|
||||||
|
|
||||||
notaryBalanceErrMsg = "can't fetch notary balance"
|
|
||||||
notaryNotEnabledPanicMsg = "notary support was not enabled on this client"
|
notaryNotEnabledPanicMsg = "notary support was not enabled on this client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errUnexpectedItems = errors.New("invalid number of NEO VM arguments on stack")
|
|
||||||
|
|
||||||
func defaultNotaryConfig(c *Client) *notaryCfg {
|
func defaultNotaryConfig(c *Client) *notaryCfg {
|
||||||
return ¬aryCfg{
|
return ¬aryCfg{
|
||||||
txValidTime: defaultNotaryValidTime,
|
txValidTime: defaultNotaryValidTime,
|
||||||
|
@ -155,15 +150,16 @@ func (c *Client) DepositNotary(ctx context.Context, amount fixedn.Fixed8, delta
|
||||||
|
|
||||||
bc, err := c.rpcActor.GetBlockCount()
|
bc, err := c.rpcActor.GetBlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.Uint256{}, fmt.Errorf("can't get blockchain height: %w", err)
|
return util.Uint256{}, fmt.Errorf("get blockchain height: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentTill, err := c.depositExpirationOf()
|
r := notary.NewReader(c.rpcActor)
|
||||||
|
currentTill, err := r.ExpirationOf(c.acc.PrivateKey().GetScriptHash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.Uint256{}, fmt.Errorf("can't get previous expiration value: %w", err)
|
return util.Uint256{}, fmt.Errorf("get previous expiration value: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
till := max(int64(bc+delta), currentTill)
|
till := max(int64(bc+delta), int64(currentTill))
|
||||||
res, _, err := c.depositNotary(ctx, amount, till)
|
res, _, err := c.depositNotary(ctx, amount, till)
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
@ -197,7 +193,7 @@ func (c *Client) depositNotary(ctx context.Context, amount fixedn.Fixed8, till i
|
||||||
[]any{c.acc.PrivateKey().GetScriptHash(), till})
|
[]any{c.acc.PrivateKey().GetScriptHash(), till})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, neorpc.ErrAlreadyExists) {
|
if !errors.Is(err, neorpc.ErrAlreadyExists) {
|
||||||
return util.Uint256{}, 0, fmt.Errorf("can't make notary deposit: %w", err)
|
return util.Uint256{}, 0, fmt.Errorf("make notary deposit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transaction is already in mempool waiting to be processed.
|
// Transaction is already in mempool waiting to be processed.
|
||||||
|
@ -237,18 +233,10 @@ func (c *Client) GetNotaryDeposit() (res int64, err error) {
|
||||||
|
|
||||||
sh := c.acc.PrivateKey().PublicKey().GetScriptHash()
|
sh := c.acc.PrivateKey().PublicKey().GetScriptHash()
|
||||||
|
|
||||||
items, err := c.TestInvoke(c.notary.notary, notaryBalanceOfMethod, sh)
|
r := notary.NewReader(c.rpcActor)
|
||||||
|
bigIntDeposit, err := r.BalanceOf(sh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("test invoke (%s): %w", notaryBalanceOfMethod, err)
|
return 0, fmt.Errorf("get notary deposit: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
if len(items) != 1 {
|
|
||||||
return 0, wrapFrostFSError(fmt.Errorf("%v: %w", notaryBalanceErrMsg, errUnexpectedItems))
|
|
||||||
}
|
|
||||||
|
|
||||||
bigIntDeposit, err := items[0].TryInteger()
|
|
||||||
if err != nil {
|
|
||||||
return 0, wrapFrostFSError(fmt.Errorf("%v: %w", notaryBalanceErrMsg, err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return bigIntDeposit.Int64(), nil
|
return bigIntDeposit.Int64(), nil
|
||||||
|
@ -289,7 +277,7 @@ func (c *Client) UpdateNotaryList(ctx context.Context, prm UpdateNotaryListPrm)
|
||||||
|
|
||||||
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
|
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not calculate nonce and `valicUntilBlock` values: %w", err)
|
return fmt.Errorf("calculate nonce and `valicUntilBlock` values: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.notaryInvokeAsCommittee(
|
return c.notaryInvokeAsCommittee(
|
||||||
|
@ -338,7 +326,7 @@ func (c *Client) UpdateNeoFSAlphabetList(ctx context.Context, prm UpdateAlphabet
|
||||||
|
|
||||||
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
|
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not calculate nonce and `valicUntilBlock` values: %w", err)
|
return fmt.Errorf("calculate nonce and `valicUntilBlock` values: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.notaryInvokeAsCommittee(
|
return c.notaryInvokeAsCommittee(
|
||||||
|
@ -407,7 +395,7 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error {
|
||||||
|
|
||||||
alphabetList, err := c.notary.alphabetSource()
|
alphabetList, err := c.notary.alphabetSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not fetch current alphabet keys: %w", err)
|
return fmt.Errorf("fetch current alphabet keys: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
|
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
|
||||||
|
@ -529,24 +517,24 @@ func (c *Client) notaryCosignersFromTx(mainTx *transaction.Transaction, alphabet
|
||||||
if ok {
|
if ok {
|
||||||
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
|
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key: %w", err)
|
return nil, fmt.Errorf("parse verification script of signer #2: invalid public key: %w", err)
|
||||||
}
|
}
|
||||||
acc = notary.FakeSimpleAccount(pub)
|
acc = notary.FakeSimpleAccount(pub)
|
||||||
} else {
|
} else {
|
||||||
m, pubsBytes, ok := vm.ParseMultiSigContract(script)
|
m, pubsBytes, ok := vm.ParseMultiSigContract(script)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("failed to parse verification script of signer #2: unknown witness type")
|
return nil, errors.New("parse verification script of signer #2: unknown witness type")
|
||||||
}
|
}
|
||||||
pubs := make(keys.PublicKeys, len(pubsBytes))
|
pubs := make(keys.PublicKeys, len(pubsBytes))
|
||||||
for i := range pubs {
|
for i := range pubs {
|
||||||
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
|
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key #%d: %w", i, err)
|
return nil, fmt.Errorf("parse verification script of signer #2: invalid public key #%d: %w", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
acc, err = notary.FakeMultisigAccount(m, pubs)
|
acc, err = notary.FakeMultisigAccount(m, pubs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create fake account for signer #2: %w", err)
|
return nil, fmt.Errorf("create fake account for signer #2: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -623,7 +611,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
|
||||||
err := multisigAccount.ConvertMultisig(m, ir)
|
err := multisigAccount.ConvertMultisig(m, ir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// wrap error as FrostFS-specific since the call is not related to any client
|
// wrap error as FrostFS-specific since the call is not related to any client
|
||||||
return nil, wrapFrostFSError(fmt.Errorf("can't convert account to inner ring multisig wallet: %w", err))
|
return nil, wrapFrostFSError(fmt.Errorf("convert account to inner ring multisig wallet: %w", err))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// alphabet multisig redeem script is
|
// alphabet multisig redeem script is
|
||||||
|
@ -632,7 +620,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
|
||||||
multisigAccount, err = notary.FakeMultisigAccount(m, ir)
|
multisigAccount, err = notary.FakeMultisigAccount(m, ir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// wrap error as FrostFS-specific since the call is not related to any client
|
// wrap error as FrostFS-specific since the call is not related to any client
|
||||||
return nil, wrapFrostFSError(fmt.Errorf("can't make inner ring multisig wallet: %w", err))
|
return nil, wrapFrostFSError(fmt.Errorf("make inner ring multisig wallet: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +630,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
|
||||||
func (c *Client) notaryTxValidationLimit() (uint32, error) {
|
func (c *Client) notaryTxValidationLimit() (uint32, error) {
|
||||||
bc, err := c.rpcActor.GetBlockCount()
|
bc, err := c.rpcActor.GetBlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't get current blockchain height: %w", err)
|
return 0, fmt.Errorf("get current blockchain height: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
minTime := bc + c.notary.txValidTime
|
minTime := bc + c.notary.txValidTime
|
||||||
|
@ -651,24 +639,6 @@ func (c *Client) notaryTxValidationLimit() (uint32, error) {
|
||||||
return rounded, nil
|
return rounded, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) depositExpirationOf() (int64, error) {
|
|
||||||
expirationRes, err := c.TestInvoke(c.notary.notary, notaryExpirationOfMethod, c.acc.PrivateKey().GetScriptHash())
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("test invoke (%s): %w", notaryExpirationOfMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(expirationRes) != 1 {
|
|
||||||
return 0, fmt.Errorf("method returned unexpected item count: %d", len(expirationRes))
|
|
||||||
}
|
|
||||||
|
|
||||||
currentTillBig, err := expirationRes[0].TryInteger()
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("can't parse deposit till value: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return currentTillBig.Int64(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sigCount returns the number of required signature.
|
// sigCount returns the number of required signature.
|
||||||
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
|
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
|
||||||
// If committee is true, returns M as N/2+1.
|
// If committee is true, returns M as N/2+1.
|
||||||
|
@ -742,12 +712,12 @@ func alreadyOnChainError(err error) bool {
|
||||||
func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed8, error) {
|
func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed8, error) {
|
||||||
notaryBalance, err := c.GetNotaryDeposit()
|
notaryBalance, err := c.GetNotaryDeposit()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not get notary balance: %w", err)
|
return 0, fmt.Errorf("get notary balance: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gasBalance, err := c.GasBalance()
|
gasBalance, err := c.GasBalance()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not get GAS balance: %w", err)
|
return 0, fmt.Errorf("get GAS balance: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if gasBalance == 0 {
|
if gasBalance == 0 {
|
||||||
|
@ -796,12 +766,12 @@ func (c *Client) calculateNonceAndVUB(hash *util.Uint256, roundBlockHeight bool)
|
||||||
if hash != nil {
|
if hash != nil {
|
||||||
height, err = c.getTransactionHeight(*hash)
|
height, err = c.getTransactionHeight(*hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not get transaction height: %w", err)
|
return 0, 0, fmt.Errorf("get transaction height: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
height, err = c.rpcActor.GetBlockCount()
|
height, err = c.rpcActor.GetBlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, fmt.Errorf("could not get chain height: %w", err)
|
return 0, 0, fmt.Errorf("get chain height: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ func (s StaticClient) Invoke(ctx context.Context, prm InvokePrm) (InvokeRes, err
|
||||||
nonce, vub, err = s.client.CalculateNonceAndVUB(prm.hash)
|
nonce, vub, err = s.client.CalculateNonceAndVUB(prm.hash)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return InvokeRes{}, fmt.Errorf("could not calculate nonce and VUB for notary alphabet invoke: %w", err)
|
return InvokeRes{}, fmt.Errorf("calculate nonce and VUB for notary alphabet invoke: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
vubP = &vub
|
vubP = &vub
|
||||||
|
|
|
@ -53,7 +53,7 @@ func BytesFromStackItem(param stackitem.Item) ([]byte, error) {
|
||||||
case stackitem.IntegerT:
|
case stackitem.IntegerT:
|
||||||
n, err := param.TryInteger()
|
n, err := param.TryInteger()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't parse integer bytes: %w", err)
|
return nil, fmt.Errorf("parse integer bytes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.Bytes(), nil
|
return n.Bytes(), nil
|
||||||
|
|
|
@ -111,7 +111,7 @@ type listener struct {
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
const newListenerFailMsg = "could not instantiate Listener"
|
const newListenerFailMsg = "instantiate Listener"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errNilLogger = errors.New("nil logger")
|
errNilLogger = errors.New("nil logger")
|
||||||
|
@ -180,7 +180,7 @@ func (l *listener) subscribe(errCh chan error) {
|
||||||
// fill the list with the contracts with set event parsers.
|
// fill the list with the contracts with set event parsers.
|
||||||
l.mtx.RLock()
|
l.mtx.RLock()
|
||||||
for hashType := range l.notificationParsers {
|
for hashType := range l.notificationParsers {
|
||||||
scHash := hashType.ScriptHash()
|
scHash := hashType.Hash
|
||||||
|
|
||||||
// prevent repetitions
|
// prevent repetitions
|
||||||
for _, hash := range hashes {
|
for _, hash := range hashes {
|
||||||
|
@ -189,26 +189,26 @@ func (l *listener) subscribe(errCh chan error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hashes = append(hashes, hashType.ScriptHash())
|
hashes = append(hashes, hashType.Hash)
|
||||||
}
|
}
|
||||||
l.mtx.RUnlock()
|
l.mtx.RUnlock()
|
||||||
|
|
||||||
err := l.subscriber.SubscribeForNotification(hashes...)
|
err := l.subscriber.SubscribeForNotification(hashes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh <- fmt.Errorf("could not subscribe for notifications: %w", err)
|
errCh <- fmt.Errorf("subscribe for notifications: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(l.blockHandlers) > 0 {
|
if len(l.blockHandlers) > 0 {
|
||||||
if err = l.subscriber.BlockNotifications(); err != nil {
|
if err = l.subscriber.BlockNotifications(); err != nil {
|
||||||
errCh <- fmt.Errorf("could not subscribe for blocks: %w", err)
|
errCh <- fmt.Errorf("subscribe for blocks: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.listenNotary {
|
if l.listenNotary {
|
||||||
if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
|
if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
|
||||||
errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err)
|
errCh <- fmt.Errorf("subscribe for notary requests: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -326,9 +326,7 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
|
||||||
)
|
)
|
||||||
|
|
||||||
// get the event parser
|
// get the event parser
|
||||||
keyEvent := scriptHashWithType{}
|
keyEvent := scriptHashWithType{Hash: notifyEvent.ScriptHash, Type: typEvent}
|
||||||
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
|
|
||||||
keyEvent.SetType(typEvent)
|
|
||||||
|
|
||||||
l.mtx.RLock()
|
l.mtx.RLock()
|
||||||
parser, ok := l.notificationParsers[keyEvent]
|
parser, ok := l.notificationParsers[keyEvent]
|
||||||
|
@ -451,9 +449,7 @@ func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
|
||||||
l.mtx.Lock()
|
l.mtx.Lock()
|
||||||
defer l.mtx.Unlock()
|
defer l.mtx.Unlock()
|
||||||
|
|
||||||
var k scriptHashWithType
|
k := scriptHashWithType{Hash: hi.Contract, Type: hi.Type}
|
||||||
k.hash = hi.Contract
|
|
||||||
k.typ = hi.Type
|
|
||||||
|
|
||||||
l.notificationParsers[k] = hi.Parser
|
l.notificationParsers[k] = hi.Parser
|
||||||
l.notificationHandlers[k] = append(
|
l.notificationHandlers[k] = append(
|
||||||
|
@ -570,7 +566,7 @@ func NewListener(p ListenerParams) (Listener, error) {
|
||||||
// The default capacity is 0, which means "infinite".
|
// The default capacity is 0, which means "infinite".
|
||||||
pool, err := ants.NewPool(p.WorkerPoolCapacity)
|
pool, err := ants.NewPool(p.WorkerPoolCapacity)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not init worker pool: %w", err)
|
return nil, fmt.Errorf("init worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &listener{
|
return &listener{
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNilPubKey = errors.New("could not parse public key: public key is nil")
|
var errNilPubKey = errors.New("public key is nil")
|
||||||
|
|
||||||
func (s *UpdatePeer) setPublicKey(v []byte) (err error) {
|
func (s *UpdatePeer) setPublicKey(v []byte) (err error) {
|
||||||
if v == nil {
|
if v == nil {
|
||||||
|
@ -19,7 +19,7 @@ func (s *UpdatePeer) setPublicKey(v []byte) (err error) {
|
||||||
|
|
||||||
s.PubKey, err = keys.NewPublicKeyFromBytes(v, elliptic.P256())
|
s.PubKey, err = keys.NewPublicKeyFromBytes(v, elliptic.P256())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse public key: %w", err)
|
return fmt.Errorf("parse public key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
@ -127,7 +127,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
|
||||||
for {
|
for {
|
||||||
opCode, param, err = ctx.Next()
|
opCode, param, err = ctx.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get next opcode in script: %w", err)
|
return nil, fmt.Errorf("get next opcode in script: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if opCode == opcode.RET {
|
if opCode == opcode.RET {
|
||||||
|
@ -147,7 +147,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
|
||||||
// retrieve contract's script hash
|
// retrieve contract's script hash
|
||||||
contractHash, err := util.Uint160DecodeBytesBE(ops[opsLen-2].param)
|
contractHash, err := util.Uint160DecodeBytesBE(ops[opsLen-2].param)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not decode contract hash: %w", err)
|
return nil, fmt.Errorf("decode contract hash: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieve contract's method
|
// retrieve contract's method
|
||||||
|
@ -164,7 +164,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
|
||||||
if len(args) != 0 {
|
if len(args) != 0 {
|
||||||
err = p.validateParameterOpcodes(args)
|
err = p.validateParameterOpcodes(args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not validate arguments: %w", err)
|
return nil, fmt.Errorf("validate arguments: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// without args packing opcodes
|
// without args packing opcodes
|
||||||
|
@ -206,7 +206,7 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
|
||||||
|
|
||||||
currentAlphabet, err := p.alphaKeys()
|
currentAlphabet, err := p.alphaKeys()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not fetch Alphabet public keys: %w", err)
|
return fmt.Errorf("fetch Alphabet public keys: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = p.validateCosigners(ln, nr.MainTransaction.Signers, currentAlphabet)
|
err = p.validateCosigners(ln, nr.MainTransaction.Signers, currentAlphabet)
|
||||||
|
@ -239,7 +239,7 @@ func (p Preparator) validateParameterOpcodes(ops []Op) error {
|
||||||
|
|
||||||
argsLen, err := IntFromOpcode(ops[l-2])
|
argsLen, err := IntFromOpcode(ops[l-2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse argument len: %w", err)
|
return fmt.Errorf("parse argument len: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = validateNestedArgs(argsLen, ops[:l-2])
|
err = validateNestedArgs(argsLen, ops[:l-2])
|
||||||
|
@ -273,7 +273,7 @@ func validateNestedArgs(expArgLen int64, ops []Op) error {
|
||||||
|
|
||||||
argsLen, err := IntFromOpcode(ops[i-1])
|
argsLen, err := IntFromOpcode(ops[i-1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse argument len: %w", err)
|
return fmt.Errorf("parse argument len: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expArgLen += argsLen + 1
|
expArgLen += argsLen + 1
|
||||||
|
@ -307,7 +307,7 @@ func (p Preparator) validateExpiration(fbTX *transaction.Transaction) error {
|
||||||
|
|
||||||
currBlock, err := p.blockCounter.BlockCount()
|
currBlock, err := p.blockCounter.BlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not fetch current chain height: %w", err)
|
return fmt.Errorf("fetch current chain height: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if currBlock >= nvb.Height {
|
if currBlock >= nvb.Height {
|
||||||
|
@ -327,7 +327,7 @@ func (p Preparator) validateCosigners(expected int, s []transaction.Signer, alph
|
||||||
|
|
||||||
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
|
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get Alphabet verification script: %w", err)
|
return fmt.Errorf("get Alphabet verification script: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s[1].Account.Equals(hash.Hash160(alphaVerificationScript)) {
|
if !s[1].Account.Equals(hash.Hash160(alphaVerificationScript)) {
|
||||||
|
@ -346,7 +346,7 @@ func (p Preparator) validateWitnesses(w []transaction.Witness, alphaKeys keys.Pu
|
||||||
|
|
||||||
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
|
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not get Alphabet verification script: %w", err)
|
return fmt.Errorf("get Alphabet verification script: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the second one must be witness of the current
|
// the second one must be witness of the current
|
||||||
|
|
|
@ -26,7 +26,7 @@ func (Designate) MorphEvent() {}
|
||||||
func ParseDesignate(e *state.ContainedNotificationEvent) (event.Event, error) {
|
func ParseDesignate(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||||
params, err := event.ParseStackArray(e)
|
params, err := event.ParseStackArray(e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err)
|
return nil, fmt.Errorf("parse stack items from notify event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(params) != 2 {
|
if len(params) != 2 {
|
||||||
|
|
|
@ -20,13 +20,9 @@ type scriptHashValue struct {
|
||||||
hash util.Uint160
|
hash util.Uint160
|
||||||
}
|
}
|
||||||
|
|
||||||
type typeValue struct {
|
|
||||||
typ Type
|
|
||||||
}
|
|
||||||
|
|
||||||
type scriptHashWithType struct {
|
type scriptHashWithType struct {
|
||||||
scriptHashValue
|
Hash util.Uint160
|
||||||
typeValue
|
Type Type
|
||||||
}
|
}
|
||||||
|
|
||||||
type notaryRequestTypes struct {
|
type notaryRequestTypes struct {
|
||||||
|
@ -73,16 +69,6 @@ func (s scriptHashValue) ScriptHash() util.Uint160 {
|
||||||
return s.hash
|
return s.hash
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetType is an event type setter.
|
|
||||||
func (s *typeValue) SetType(v Type) {
|
|
||||||
s.typ = v
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetType is an event type getter.
|
|
||||||
func (s typeValue) GetType() Type {
|
|
||||||
return s.typ
|
|
||||||
}
|
|
||||||
|
|
||||||
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
|
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
|
||||||
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
|
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
|
||||||
return func(ctx context.Context, e Event) {
|
return func(ctx context.Context, e Event) {
|
||||||
|
|
|
@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type containerStreamerV2 struct {
|
||||||
|
containerGRPC.ContainerService_ListStreamServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
|
||||||
|
return s.ContainerService_ListStreamServer.Send(
|
||||||
|
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
|
||||||
|
// to gRPC stream.
|
||||||
|
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
|
||||||
|
listReq := new(container.ListStreamRequest)
|
||||||
|
if err := listReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.srv.ListStream(listReq, &containerStreamerV2{
|
||||||
|
ContainerService_ListStreamServer: gStream,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
return nil, apeErr(nativeschema.MethodListContainers, s)
|
return nil, apeErr(nativeschema.MethodListContainers, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps := map[string]string{
|
||||||
|
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
|
||||||
|
nativeschema.PropertyKeyActorRole: role,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if p, ok := peer.FromContext(ctx); ok {
|
||||||
|
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
|
||||||
|
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not get owner namespace: %w", err)
|
||||||
|
}
|
||||||
|
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
request := aperequest.NewRequest(
|
||||||
|
nativeschema.MethodListContainers,
|
||||||
|
aperequest.NewResource(
|
||||||
|
resourceName(namespace, ""),
|
||||||
|
make(map[string]string),
|
||||||
|
),
|
||||||
|
reqProps,
|
||||||
|
)
|
||||||
|
|
||||||
|
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get group ids: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Policy contract keeps group related chains as namespace-group pair.
|
||||||
|
for i := range groups {
|
||||||
|
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
rt := policyengine.NewRequestTargetWithNamespace(namespace)
|
||||||
|
rt.User = &policyengine.Target{
|
||||||
|
Type: policyengine.User,
|
||||||
|
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
|
||||||
|
}
|
||||||
|
rt.Groups = make([]policyengine.Target, len(groups))
|
||||||
|
for i := range groups {
|
||||||
|
rt.Groups[i] = policyengine.GroupTarget(groups[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if found && s == apechain.Allow {
|
||||||
|
return ac.next.ListStream(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
return apeErr(nativeschema.MethodListContainers, s)
|
||||||
|
}
|
||||||
|
|
||||||
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
|
@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
|
||||||
return &container.ListResponse{}, nil
|
return &container.ListResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
|
||||||
|
s.calls["ListStream"]++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
|
||||||
s.calls["Put"]++
|
s.calls["Put"]++
|
||||||
return &container.PutResponse{}, nil
|
return &container.PutResponse{}, nil
|
||||||
|
|
|
@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListStream implements Server.
|
||||||
|
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := a.next.ListStream(req, stream)
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
|
||||||
|
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Put implements Server.
|
// Put implements Server.
|
||||||
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
res, err := a.next.Put(ctx, req)
|
res, err := a.next.Put(ctx, req)
|
||||||
|
|
|
@ -14,6 +14,7 @@ type ServiceExecutor interface {
|
||||||
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
|
||||||
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
|
||||||
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
|
||||||
|
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type executorSvc struct {
|
type executorSvc struct {
|
||||||
|
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
s.respSvc.SetMeta(resp)
|
s.respSvc.SetMeta(resp)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
err := s.exec.ListStream(stream.Context(), req, stream)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not execute ListStream request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
|
||||||
|
body := req.GetBody()
|
||||||
|
idV2 := body.GetOwnerID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return errMissingUserID
|
||||||
|
}
|
||||||
|
|
||||||
|
var id user.ID
|
||||||
|
|
||||||
|
err := id.ReadFromV2(*idV2)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid user ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrs, err := s.rdr.ContainersOf(&id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cidList := make([]refs.ContainerID, len(cnrs))
|
||||||
|
for i := range cnrs {
|
||||||
|
cnrs[i].WriteToV2(&cidList[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
resBody := new(container.ListStreamResponseBody)
|
||||||
|
resBody.SetContainerIDs(cidList)
|
||||||
|
r := new(container.ListStreamResponse)
|
||||||
|
r.SetBody(resBody)
|
||||||
|
|
||||||
|
return stream.Send(r)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package container
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,4 +13,11 @@ type Server interface {
|
||||||
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
|
||||||
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
|
||||||
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
List(context.Context, *container.ListRequest) (*container.ListResponse, error)
|
||||||
|
ListStream(*container.ListStreamRequest, ListStream) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListStream is an interface of FrostFS API v2 compatible search streamer.
|
||||||
|
type ListStream interface {
|
||||||
|
util.ServerStream
|
||||||
|
Send(*container.ListStreamResponse) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
|
||||||
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
|
resp := new(container.ListStreamResponse)
|
||||||
|
_ = s.sigSvc.SignResponse(resp, err)
|
||||||
|
return stream.Send(resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := &listStreamSigner{
|
||||||
|
ListStream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}
|
||||||
|
err := s.svc.ListStream(req, ss)
|
||||||
|
if err != nil || !ss.nonEmptyResp {
|
||||||
|
return ss.send(new(container.ListStreamResponse), err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type listStreamSigner struct {
|
||||||
|
ListStream
|
||||||
|
sigSvc *util.SignService
|
||||||
|
|
||||||
|
nonEmptyResp bool // set on first Send call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.nonEmptyResp = true
|
||||||
|
return s.send(resp, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
|
||||||
|
if err := s.sigSvc.SignResponse(resp, err); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return s.ListStream.Send(resp)
|
||||||
|
}
|
||||||
|
|
92
pkg/services/container/transport_splitter.go
Normal file
92
pkg/services/container/transport_splitter.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
TransportSplitter struct {
|
||||||
|
next Server
|
||||||
|
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
listStreamMsgSizeCtrl struct {
|
||||||
|
util.ServerStream
|
||||||
|
stream ListStream
|
||||||
|
respSvc *response.Service
|
||||||
|
cnrAmount uint32
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server {
|
||||||
|
return &TransportSplitter{
|
||||||
|
next: next,
|
||||||
|
respSvc: respSvc,
|
||||||
|
cnrAmount: cnrAmount,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
|
||||||
|
return s.next.Put(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
|
||||||
|
return s.next.Delete(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
|
||||||
|
return s.next.Get(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
|
||||||
|
return s.next.List(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
|
||||||
|
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
|
||||||
|
ServerStream: stream,
|
||||||
|
stream: stream,
|
||||||
|
respSvc: s.respSvc,
|
||||||
|
cnrAmount: s.cnrAmount,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
|
||||||
|
s.respSvc.SetMeta(resp)
|
||||||
|
body := resp.GetBody()
|
||||||
|
ids := body.GetContainerIDs()
|
||||||
|
|
||||||
|
var newResp *container.ListStreamResponse
|
||||||
|
|
||||||
|
for {
|
||||||
|
if newResp == nil {
|
||||||
|
newResp = new(container.ListStreamResponse)
|
||||||
|
newResp.SetBody(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
cut := min(s.cnrAmount, uint32(len(ids)))
|
||||||
|
|
||||||
|
body.SetContainerIDs(ids[:cut])
|
||||||
|
newResp.SetMetaHeader(resp.GetMetaHeader())
|
||||||
|
newResp.SetVerificationHeader(resp.GetVerificationHeader())
|
||||||
|
|
||||||
|
if err := s.stream.Send(newResp); err != nil {
|
||||||
|
return fmt.Errorf("TransportSplitter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ids = ids[cut:]
|
||||||
|
|
||||||
|
if len(ids) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in a new issue