Compare commits

...

6 commits

Author SHA1 Message Date
03d5a1d5ad [#1452] container: Add ListStream method
* Added new method for listing containers to container service.
  It opens stream and sends containers in batches.

* Added TransportSplitter wrapper around ExecutionService to
  split container ID list read from contract in parts that are
  smaller than grpc max message size. Batch size can be changed
  in node configuration file (as in example config file).

* Changed `container list` implementaion in cli: now ListStream
  is called by default. Old List is called only if ListStream
  is not implemented.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-13 17:22:13 +03:00
0e11cbe9dd [#1453] container: Replace sort.Slice with slices.SortFunc
* Replaced `sort.Slice` with `slices.SortFunc` in
  `ListContainersRes.SortedIDList()` as it is a bit faster,
  according to 15102e6dfd.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-12-13 17:22:13 +03:00
d165ac042c
[#1558] morph/client: Reuse notary rpcclient wrapper
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-12 15:30:12 +03:00
7151c71d51
[#1558] morph/client: Remove "could not"/"can't"/"failed to" from error messages
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-12 15:30:12 +03:00
91d9dc2676
[#1558] morph/event: Remove "could not" from error messages
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-12 15:30:12 +03:00
7853dbc315 [#1557] morph/event: Remove embedded structs from scriptHashWithValue
Also, make them public, because otherwise `unused` linter complains.
```
pkg/morph/event/utils.go:25:2  unused  field `typ` is unused
```
This complain is wrong, though: we _use_ `typ` field because the whole
struct is used as a map key.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-12-12 11:55:09 +00:00
44 changed files with 513 additions and 146 deletions

View file

@ -9,7 +9,6 @@ import (
"io" "io"
"os" "os"
"slices" "slices"
"sort"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
@ -78,13 +77,31 @@ func ListContainers(ctx context.Context, prm ListContainersPrm) (res ListContain
// SortedIDList returns sorted list of identifiers of user's containers. // SortedIDList returns sorted list of identifiers of user's containers.
func (x ListContainersRes) SortedIDList() []cid.ID { func (x ListContainersRes) SortedIDList() []cid.ID {
list := x.cliRes.Containers() list := x.cliRes.Containers()
sort.Slice(list, func(i, j int) bool { slices.SortFunc(list, func(lhs, rhs cid.ID) int {
lhs, rhs := list[i].EncodeToString(), list[j].EncodeToString() return strings.Compare(lhs.EncodeToString(), rhs.EncodeToString())
return strings.Compare(lhs, rhs) < 0
}) })
return list return list
} }
func ListContainersStream(ctx context.Context, prm ListContainersPrm, printCnr func(id cid.ID) bool) (err error) {
cliPrm := &client.PrmContainerListStream{
XHeaders: prm.XHeaders,
OwnerID: prm.OwnerID,
Session: prm.Session,
}
rdr, err := prm.cli.ContainerListInit(ctx, *cliPrm)
if err != nil {
return fmt.Errorf("init container list: %w", err)
}
err = rdr.Iterate(printCnr)
if err != nil {
return fmt.Errorf("read container list: %w", err)
}
return
}
// PutContainerPrm groups parameters of PutContainer operation. // PutContainerPrm groups parameters of PutContainer operation.
type PutContainerPrm struct { type PutContainerPrm struct {
Client *client.Client Client *client.Client

View file

@ -6,8 +6,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
) )
// flags of list command. // flags of list command.
@ -51,16 +54,50 @@ var listContainersCmd = &cobra.Command{
var prm internalclient.ListContainersPrm var prm internalclient.ListContainersPrm
prm.SetClient(cli) prm.SetClient(cli)
prm.Account = idUser prm.OwnerID = idUser
res, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
prmGet := internalclient.GetContainerPrm{ prmGet := internalclient.GetContainerPrm{
Client: cli, Client: cli,
} }
var containerIDs []cid.ID
err := internalclient.ListContainersStream(cmd.Context(), prm, func(id cid.ID) bool {
if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(id.String())
return false
}
prmGet.ClientParams.ContainerID = &id
res, err := internalclient.GetContainer(cmd.Context(), prmGet)
if err != nil {
cmd.Printf(" failed to read attributes: %v\n", err)
return false
}
cnr := res.Container()
if cnrName := containerSDK.Name(cnr); flagVarListName != "" && cnrName != flagVarListName {
return false
}
cmd.Println(id.String())
if flagVarListPrintAttr {
cnr.IterateUserAttributes(func(key, val string) {
cmd.Printf(" %s: %s\n", key, val)
})
}
return false
})
if err == nil {
return
}
if e, ok := status.FromError(err); ok && e.Code() == codes.Unimplemented {
res, err := internalclient.ListContainers(cmd.Context(), prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
containerIDs = res.SortedIDList()
} else {
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}
containerIDs := res.SortedIDList()
for _, cnrID := range containerIDs { for _, cnrID := range containerIDs {
if flagVarListName == "" && !flagVarListPrintAttr { if flagVarListName == "" && !flagVarListPrintAttr {
cmd.Println(cnrID.String()) cmd.Println(cnrID.String())

View file

@ -609,6 +609,7 @@ type cfgContainer struct {
parsers map[event.Type]event.NotificationParser parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers workerPool util.WorkerPool // pool for asynchronous handlers
containerBatchSize uint32
} }
type cfgFrostfsID struct { type cfgFrostfsID struct {

View file

@ -0,0 +1,27 @@
package containerconfig
import "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
const (
subsection = "container"
listStreamSubsection = "list_stream"
// ContainerBatchSizeDefault represents he maximum amount of containers to send via stream at once.
ContainerBatchSizeDefault = 1000
)
// ContainerBatchSize returns the value of "batch_size" config parameter
// from "list_stream" subsection of "container" section.
//
// Returns ContainerBatchSizeDefault if the value is missing or if
// the value is not positive integer.
func ContainerBatchSize(c *config.Config) uint32 {
if c.Sub(subsection).Sub(listStreamSubsection).Value("batch_size") == nil {
return ContainerBatchSizeDefault
}
size := config.Uint32Safe(c.Sub(subsection).Sub(listStreamSubsection), "batch_size")
if size == 0 {
return ContainerBatchSizeDefault
}
return size
}

View file

@ -0,0 +1,27 @@
package containerconfig_test
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"github.com/stretchr/testify/require"
)
func TestContainerSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
empty := configtest.EmptyConfig()
require.Equal(t, uint32(containerconfig.ContainerBatchSizeDefault), containerconfig.ContainerBatchSize(empty))
})
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
require.Equal(t, uint32(1000), containerconfig.ContainerBatchSize(c))
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"net" "net"
containerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/container"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
@ -47,6 +48,7 @@ func initContainerService(_ context.Context, c *cfg) {
} }
c.shared.frostfsidClient = frostfsIDSubjectProvider c.shared.frostfsidClient = frostfsIDSubjectProvider
c.cfgContainer.containerBatchSize = containerconfig.ContainerBatchSize(c.appCfg)
defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides( defaultChainRouter := engine.NewDefaultChainRouterWithLocalOverrides(
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(), c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.MorphRuleChainStorage(),
@ -56,7 +58,9 @@ func initContainerService(_ context.Context, c *cfg) {
&c.key.PrivateKey, &c.key.PrivateKey,
containerService.NewAPEServer(defaultChainRouter, cnrRdr, containerService.NewAPEServer(defaultChainRouter, cnrRdr,
newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), containerService.NewSplitterService(
c.cfgContainer.containerBatchSize, c.respSvc,
containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc)),
), ),
) )
service = containerService.NewAuditService(service, c.log, c.audit) service = containerService.NewAuditService(service, c.log, c.audit)

View file

@ -83,6 +83,9 @@ FROSTFS_POLICER_HEAD_TIMEOUT=15s
FROSTFS_REPLICATOR_PUT_TIMEOUT=15s FROSTFS_REPLICATOR_PUT_TIMEOUT=15s
FROSTFS_REPLICATOR_POOL_SIZE=10 FROSTFS_REPLICATOR_POOL_SIZE=10
# Container service section
FROSTFS_CONTAINER_LIST_STREAM_BATCH_SIZE=1000
# Object service section # Object service section
FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100 FROSTFS_OBJECT_PUT_REMOTE_POOL_SIZE=100
FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200 FROSTFS_OBJECT_PUT_LOCAL_POOL_SIZE=200

View file

@ -124,6 +124,11 @@
"pool_size": 10, "pool_size": 10,
"put_timeout": "15s" "put_timeout": "15s"
}, },
"container": {
"list_stream": {
"batch_size": "1000"
}
},
"object": { "object": {
"delete": { "delete": {
"tombstone_lifetime": 10 "tombstone_lifetime": 10

View file

@ -108,6 +108,10 @@ replicator:
put_timeout: 15s # timeout for the Replicator PUT remote operation put_timeout: 15s # timeout for the Replicator PUT remote operation
pool_size: 10 # maximum amount of concurrent replications pool_size: 10 # maximum amount of concurrent replications
container:
list_stream:
batch_size: 1000 # container_batch_size is the maximum amount of containers to send via stream at once
object: object:
delete: delete:
tombstone_lifetime: 10 # tombstone "local" lifetime in epochs tombstone_lifetime: 10 # tombstone "local" lifetime in epochs

2
go.mod
View file

@ -8,7 +8,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241210104938-c4463df8d467
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88

BIN
go.sum

Binary file not shown.

View file

@ -29,7 +29,7 @@ func (c *Client) BalanceOf(id user.ID) (*big.Int, error) {
amount, err := client.BigIntFromStackItem(prms[0]) amount, err := client.BigIntFromStackItem(prms[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get integer stack item from stack item (%s): %w", balanceOfMethod, err) return nil, fmt.Errorf("get integer stack item from stack item (%s): %w", balanceOfMethod, err)
} }
return amount, nil return amount, nil
} }

View file

@ -21,7 +21,7 @@ func (c *Client) Decimals() (uint32, error) {
decimals, err := client.IntFromStackItem(prms[0]) decimals, err := client.IntFromStackItem(prms[0])
if err != nil { if err != nil {
return 0, fmt.Errorf("could not get integer stack item from stack item (%s): %w", decimalsMethod, err) return 0, fmt.Errorf("get integer stack item from stack item (%s): %w", decimalsMethod, err)
} }
return uint32(decimals), nil return uint32(decimals), nil
} }

View file

@ -39,7 +39,7 @@ func (c *Client) TransferX(ctx context.Context, p TransferPrm) error {
_, err = c.client.Invoke(ctx, prm) _, err = c.client.Invoke(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("could not invoke method (%s): %w", transferXMethod, err) return fmt.Errorf("invoke method (%s): %w", transferXMethod, err)
} }
return nil return nil
} }

View file

@ -196,7 +196,7 @@ func (c *Client) Invoke(ctx context.Context, contract util.Uint160, fee fixedn.F
txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) txHash, vub, err := c.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...)
if err != nil { if err != nil {
return InvokeRes{}, fmt.Errorf("could not invoke %s: %w", method, err) return InvokeRes{}, fmt.Errorf("invoke %s: %w", method, err)
} }
c.logger.Debug(ctx, logs.ClientNeoClientInvoke, c.logger.Debug(ctx, logs.ClientNeoClientInvoke,
@ -509,7 +509,7 @@ func (c *Client) NeoFSAlphabetList() (res keys.PublicKeys, err error) {
list, err := c.roleList(noderoles.NeoFSAlphabet) list, err := c.roleList(noderoles.NeoFSAlphabet)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't get alphabet nodes role list: %w", err) return nil, fmt.Errorf("get alphabet nodes role list: %w", err)
} }
return list, nil return list, nil
@ -523,7 +523,7 @@ func (c *Client) GetDesignateHash() util.Uint160 {
func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) { func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
height, err := c.rpcActor.GetBlockCount() height, err := c.rpcActor.GetBlockCount()
if err != nil { if err != nil {
return nil, fmt.Errorf("can't get chain height: %w", err) return nil, fmt.Errorf("get chain height: %w", err)
} }
return c.rolemgmt.GetDesignatedByRole(r, height) return c.rolemgmt.GetDesignatedByRole(r, height)

View file

@ -26,7 +26,7 @@ func (c *Client) ContainersOf(idUser *user.ID) ([]cid.ID, error) {
cb := func(item stackitem.Item) error { cb := func(item stackitem.Item) error {
rawID, err := client.BytesFromStackItem(item) rawID, err := client.BytesFromStackItem(item)
if err != nil { if err != nil {
return fmt.Errorf("could not get byte array from stack item (%s): %w", containersOfMethod, err) return fmt.Errorf("get byte array from stack item (%s): %w", containersOfMethod, err)
} }
var id cid.ID var id cid.ID

View file

@ -78,7 +78,7 @@ func (c *Client) Delete(ctx context.Context, p DeletePrm) (uint32, error) {
res, err := c.client.Invoke(ctx, prm) res, err := c.client.Invoke(ctx, prm)
if err != nil { if err != nil {
return 0, fmt.Errorf("could not invoke method (%s): %w", deleteMethod, err) return 0, fmt.Errorf("invoke method (%s): %w", deleteMethod, err)
} }
return res.VUB, nil return res.VUB, nil
} }

View file

@ -46,7 +46,7 @@ func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
arr, err := client.ArrayFromStackItem(res[0]) arr, err := client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get item array of container (%s): %w", deletionInfoMethod, err) return nil, fmt.Errorf("get item array of container (%s): %w", deletionInfoMethod, err)
} }
if len(arr) != 2 { if len(arr) != 2 {
@ -55,17 +55,17 @@ func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
rawOwner, err := client.BytesFromStackItem(arr[0]) rawOwner, err := client.BytesFromStackItem(arr[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err) return nil, fmt.Errorf("get byte array of container (%s): %w", deletionInfoMethod, err)
} }
var owner user.ID var owner user.ID
if err := owner.DecodeString(base58.Encode(rawOwner)); err != nil { if err := owner.DecodeString(base58.Encode(rawOwner)); err != nil {
return nil, fmt.Errorf("could not decode container owner id (%s): %w", deletionInfoMethod, err) return nil, fmt.Errorf("decode container owner id (%s): %w", deletionInfoMethod, err)
} }
epoch, err := client.BigIntFromStackItem(arr[1]) epoch, err := client.BigIntFromStackItem(arr[1])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err) return nil, fmt.Errorf("get byte array of container signature (%s): %w", deletionInfoMethod, err)
} }
return &containercore.DelInfo{ return &containercore.DelInfo{

View file

@ -60,7 +60,7 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
arr, err := client.ArrayFromStackItem(res[0]) arr, err := client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get item array of container (%s): %w", getMethod, err) return nil, fmt.Errorf("get item array of container (%s): %w", getMethod, err)
} }
if len(arr) != 4 { if len(arr) != 4 {
@ -69,29 +69,29 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
cnrBytes, err := client.BytesFromStackItem(arr[0]) cnrBytes, err := client.BytesFromStackItem(arr[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of container (%s): %w", getMethod, err) return nil, fmt.Errorf("get byte array of container (%s): %w", getMethod, err)
} }
sigBytes, err := client.BytesFromStackItem(arr[1]) sigBytes, err := client.BytesFromStackItem(arr[1])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", getMethod, err) return nil, fmt.Errorf("get byte array of container signature (%s): %w", getMethod, err)
} }
pub, err := client.BytesFromStackItem(arr[2]) pub, err := client.BytesFromStackItem(arr[2])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of public key (%s): %w", getMethod, err) return nil, fmt.Errorf("get byte array of public key (%s): %w", getMethod, err)
} }
tokBytes, err := client.BytesFromStackItem(arr[3]) tokBytes, err := client.BytesFromStackItem(arr[3])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array of session token (%s): %w", getMethod, err) return nil, fmt.Errorf("get byte array of session token (%s): %w", getMethod, err)
} }
var cnr containercore.Container var cnr containercore.Container
if err := cnr.Value.Unmarshal(cnrBytes); err != nil { if err := cnr.Value.Unmarshal(cnrBytes); err != nil {
// use other major version if there any // use other major version if there any
return nil, fmt.Errorf("can't unmarshal container: %w", err) return nil, fmt.Errorf("unmarshal container: %w", err)
} }
if len(tokBytes) > 0 { if len(tokBytes) > 0 {
@ -99,7 +99,7 @@ func (c *Client) Get(cid []byte) (*containercore.Container, error) {
err = cnr.Session.Unmarshal(tokBytes) err = cnr.Session.Unmarshal(tokBytes)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not unmarshal session token: %w", err) return nil, fmt.Errorf("unmarshal session token: %w", err)
} }
} }

View file

@ -34,14 +34,14 @@ func (c *Client) list(idUser *user.ID) ([]cid.ID, error) {
res, err = client.ArrayFromStackItem(res[0]) res, err = client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w", listMethod, err) return nil, fmt.Errorf("get stack item array from stack item (%s): %w", listMethod, err)
} }
cidList := make([]cid.ID, 0, len(res)) cidList := make([]cid.ID, 0, len(res))
for i := range res { for i := range res {
rawID, err := client.BytesFromStackItem(res[i]) rawID, err := client.BytesFromStackItem(res[i])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get byte array from stack item (%s): %w", listMethod, err) return nil, fmt.Errorf("get byte array from stack item (%s): %w", listMethod, err)
} }
var id cid.ID var id cid.ID

View file

@ -117,7 +117,7 @@ func (c *Client) Put(ctx context.Context, p PutPrm) error {
_, err := c.client.Invoke(ctx, prm) _, err := c.client.Invoke(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("could not invoke method (%s): %w", method, err) return fmt.Errorf("invoke method (%s): %w", method, err)
} }
return nil return nil
} }

View file

@ -31,7 +31,7 @@ func (c *Client) GetSubject(addr util.Uint160) (*frostfsidclient.Subject, error)
subj, err := frostfsidclient.ParseSubject(structArr) subj, err := frostfsidclient.ParseSubject(structArr)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse test invocation result (%s): %w", methodGetSubject, err) return nil, fmt.Errorf("parse test invocation result (%s): %w", methodGetSubject, err)
} }
return subj, nil return subj, nil
@ -54,7 +54,7 @@ func (c *Client) GetSubjectExtended(addr util.Uint160) (*frostfsidclient.Subject
subj, err := frostfsidclient.ParseSubjectExtended(structArr) subj, err := frostfsidclient.ParseSubjectExtended(structArr)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse test invocation result (%s): %w", methodGetSubject, err) return nil, fmt.Errorf("parse test invocation result (%s): %w", methodGetSubject, err)
} }
return subj, nil return subj, nil
@ -67,7 +67,7 @@ func checkStackItem(res []stackitem.Item) (structArr []stackitem.Item, err error
structArr, err = client.ArrayFromStackItem(res[0]) structArr, err = client.ArrayFromStackItem(res[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get item array of container (%s): %w", methodGetSubject, err) return nil, fmt.Errorf("get item array of container (%s): %w", methodGetSubject, err)
} }
return return
} }

View file

@ -25,7 +25,7 @@ func (c *Client) Epoch() (uint64, error) {
num, err := client.IntFromStackItem(items[0]) num, err := client.IntFromStackItem(items[0])
if err != nil { if err != nil {
return 0, fmt.Errorf("could not get number from stack item (%s): %w", epochMethod, err) return 0, fmt.Errorf("get number from stack item (%s): %w", epochMethod, err)
} }
return uint64(num), nil return uint64(num), nil
} }
@ -49,7 +49,7 @@ func (c *Client) LastEpochBlock() (uint32, error) {
block, err := client.IntFromStackItem(items[0]) block, err := client.IntFromStackItem(items[0])
if err != nil { if err != nil {
return 0, fmt.Errorf("could not get number from stack item (%s): %w", return 0, fmt.Errorf("get number from stack item (%s): %w",
lastEpochBlockMethod, err) lastEpochBlockMethod, err)
} }
return uint32(block), nil return uint32(block), nil

View file

@ -59,7 +59,7 @@ func irKeysFromStackItem(stack []stackitem.Item, method string) (keys.PublicKeys
irs, err := client.ArrayFromStackItem(stack[0]) irs, err := client.ArrayFromStackItem(stack[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get stack item array from stack item (%s): %w", method, err) return nil, fmt.Errorf("get stack item array from stack item (%s): %w", method, err)
} }
irKeys := make(keys.PublicKeys, len(irs)) irKeys := make(keys.PublicKeys, len(irs))
@ -79,7 +79,7 @@ const irNodeFixedPrmNumber = 1
func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) { func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) {
prms, err := client.ArrayFromStackItem(prm) prms, err := client.ArrayFromStackItem(prm)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get stack item array (IRNode): %w", err) return nil, fmt.Errorf("get stack item array (IRNode): %w", err)
} else if ln := len(prms); ln != irNodeFixedPrmNumber { } else if ln := len(prms); ln != irNodeFixedPrmNumber {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"unexpected stack item count (IRNode): expected %d, has %d", "unexpected stack item count (IRNode): expected %d, has %d",
@ -90,7 +90,7 @@ func irKeyFromStackItem(prm stackitem.Item) (*keys.PublicKey, error) {
byteKey, err := client.BytesFromStackItem(prms[0]) byteKey, err := client.BytesFromStackItem(prms[0])
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse bytes from stack item (IRNode): %w", err) return nil, fmt.Errorf("parse bytes from stack item (IRNode): %w", err)
} }
return keys.NewPublicKeyFromBytes(byteKey, elliptic.P256()) return keys.NewPublicKeyFromBytes(byteKey, elliptic.P256())

View file

@ -16,7 +16,7 @@ func (c *Client) NewEpoch(ctx context.Context, epoch uint64) error {
_, err := c.client.Invoke(ctx, prm) _, err := c.client.Invoke(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err) return fmt.Errorf("invoke method (%s): %w", newEpochMethod, err)
} }
return nil return nil
} }
@ -34,7 +34,7 @@ func (c *Client) NewEpochControl(ctx context.Context, epoch uint64, vub uint32)
res, err := c.client.Invoke(ctx, prm) res, err := c.client.Invoke(ctx, prm)
if err != nil { if err != nil {
return 0, fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err) return 0, fmt.Errorf("invoke method (%s): %w", newEpochMethod, err)
} }
return res.VUB, nil return res.VUB, nil
} }

View file

@ -41,7 +41,7 @@ func (c *Client) AddPeer(ctx context.Context, p AddPeerPrm) error {
prm.InvokePrmOptional = p.InvokePrmOptional prm.InvokePrmOptional = p.InvokePrmOptional
if _, err := c.client.Invoke(ctx, prm); err != nil { if _, err := c.client.Invoke(ctx, prm); err != nil {
return fmt.Errorf("could not invoke method (%s): %w", method, err) return fmt.Errorf("invoke method (%s): %w", method, err)
} }
return nil return nil
} }

View file

@ -107,7 +107,7 @@ func (c *Client) NNSHash() (util.Uint160, error) {
func nnsResolveItem(c *rpcclient.WSClient, nnsHash util.Uint160, domain string) (stackitem.Item, error) { func nnsResolveItem(c *rpcclient.WSClient, nnsHash util.Uint160, domain string) (stackitem.Item, error) {
found, err := exists(c, nnsHash, domain) found, err := exists(c, nnsHash, domain)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not check presence in NNS contract for %s: %w", domain, err) return nil, fmt.Errorf("check presence in NNS contract for %s: %w", domain, err)
} }
if !found { if !found {

View file

@ -58,16 +58,11 @@ const (
defaultNotaryValidTime = 50 defaultNotaryValidTime = 50
defaultNotaryRoundTime = 100 defaultNotaryRoundTime = 100
notaryBalanceOfMethod = "balanceOf"
notaryExpirationOfMethod = "expirationOf"
setDesignateMethod = "designateAsRole" setDesignateMethod = "designateAsRole"
notaryBalanceErrMsg = "can't fetch notary balance"
notaryNotEnabledPanicMsg = "notary support was not enabled on this client" notaryNotEnabledPanicMsg = "notary support was not enabled on this client"
) )
var errUnexpectedItems = errors.New("invalid number of NEO VM arguments on stack")
func defaultNotaryConfig(c *Client) *notaryCfg { func defaultNotaryConfig(c *Client) *notaryCfg {
return &notaryCfg{ return &notaryCfg{
txValidTime: defaultNotaryValidTime, txValidTime: defaultNotaryValidTime,
@ -155,15 +150,16 @@ func (c *Client) DepositNotary(ctx context.Context, amount fixedn.Fixed8, delta
bc, err := c.rpcActor.GetBlockCount() bc, err := c.rpcActor.GetBlockCount()
if err != nil { if err != nil {
return util.Uint256{}, fmt.Errorf("can't get blockchain height: %w", err) return util.Uint256{}, fmt.Errorf("get blockchain height: %w", err)
} }
currentTill, err := c.depositExpirationOf() r := notary.NewReader(c.rpcActor)
currentTill, err := r.ExpirationOf(c.acc.PrivateKey().GetScriptHash())
if err != nil { if err != nil {
return util.Uint256{}, fmt.Errorf("can't get previous expiration value: %w", err) return util.Uint256{}, fmt.Errorf("get previous expiration value: %w", err)
} }
till := max(int64(bc+delta), currentTill) till := max(int64(bc+delta), int64(currentTill))
res, _, err := c.depositNotary(ctx, amount, till) res, _, err := c.depositNotary(ctx, amount, till)
return res, err return res, err
} }
@ -197,7 +193,7 @@ func (c *Client) depositNotary(ctx context.Context, amount fixedn.Fixed8, till i
[]any{c.acc.PrivateKey().GetScriptHash(), till}) []any{c.acc.PrivateKey().GetScriptHash(), till})
if err != nil { if err != nil {
if !errors.Is(err, neorpc.ErrAlreadyExists) { if !errors.Is(err, neorpc.ErrAlreadyExists) {
return util.Uint256{}, 0, fmt.Errorf("can't make notary deposit: %w", err) return util.Uint256{}, 0, fmt.Errorf("make notary deposit: %w", err)
} }
// Transaction is already in mempool waiting to be processed. // Transaction is already in mempool waiting to be processed.
@ -237,18 +233,10 @@ func (c *Client) GetNotaryDeposit() (res int64, err error) {
sh := c.acc.PrivateKey().PublicKey().GetScriptHash() sh := c.acc.PrivateKey().PublicKey().GetScriptHash()
items, err := c.TestInvoke(c.notary.notary, notaryBalanceOfMethod, sh) r := notary.NewReader(c.rpcActor)
bigIntDeposit, err := r.BalanceOf(sh)
if err != nil { if err != nil {
return 0, fmt.Errorf("test invoke (%s): %w", notaryBalanceOfMethod, err) return 0, fmt.Errorf("get notary deposit: %w", err)
}
if len(items) != 1 {
return 0, wrapFrostFSError(fmt.Errorf("%v: %w", notaryBalanceErrMsg, errUnexpectedItems))
}
bigIntDeposit, err := items[0].TryInteger()
if err != nil {
return 0, wrapFrostFSError(fmt.Errorf("%v: %w", notaryBalanceErrMsg, err))
} }
return bigIntDeposit.Int64(), nil return bigIntDeposit.Int64(), nil
@ -289,7 +277,7 @@ func (c *Client) UpdateNotaryList(ctx context.Context, prm UpdateNotaryListPrm)
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash) nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
if err != nil { if err != nil {
return fmt.Errorf("could not calculate nonce and `valicUntilBlock` values: %w", err) return fmt.Errorf("calculate nonce and `valicUntilBlock` values: %w", err)
} }
return c.notaryInvokeAsCommittee( return c.notaryInvokeAsCommittee(
@ -338,7 +326,7 @@ func (c *Client) UpdateNeoFSAlphabetList(ctx context.Context, prm UpdateAlphabet
nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash) nonce, vub, err := c.CalculateNonceAndVUB(&prm.hash)
if err != nil { if err != nil {
return fmt.Errorf("could not calculate nonce and `valicUntilBlock` values: %w", err) return fmt.Errorf("calculate nonce and `valicUntilBlock` values: %w", err)
} }
return c.notaryInvokeAsCommittee( return c.notaryInvokeAsCommittee(
@ -407,7 +395,7 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error {
alphabetList, err := c.notary.alphabetSource() alphabetList, err := c.notary.alphabetSource()
if err != nil { if err != nil {
return fmt.Errorf("could not fetch current alphabet keys: %w", err) return fmt.Errorf("fetch current alphabet keys: %w", err)
} }
cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList) cosigners, err := c.notaryCosignersFromTx(mainTx, alphabetList)
@ -529,24 +517,24 @@ func (c *Client) notaryCosignersFromTx(mainTx *transaction.Transaction, alphabet
if ok { if ok {
pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256()) pub, err := keys.NewPublicKeyFromBytes(pubBytes, elliptic.P256())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key: %w", err) return nil, fmt.Errorf("parse verification script of signer #2: invalid public key: %w", err)
} }
acc = notary.FakeSimpleAccount(pub) acc = notary.FakeSimpleAccount(pub)
} else { } else {
m, pubsBytes, ok := vm.ParseMultiSigContract(script) m, pubsBytes, ok := vm.ParseMultiSigContract(script)
if !ok { if !ok {
return nil, errors.New("failed to parse verification script of signer #2: unknown witness type") return nil, errors.New("parse verification script of signer #2: unknown witness type")
} }
pubs := make(keys.PublicKeys, len(pubsBytes)) pubs := make(keys.PublicKeys, len(pubsBytes))
for i := range pubs { for i := range pubs {
pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256()) pubs[i], err = keys.NewPublicKeyFromBytes(pubsBytes[i], elliptic.P256())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse verification script of signer #2: invalid public key #%d: %w", i, err) return nil, fmt.Errorf("parse verification script of signer #2: invalid public key #%d: %w", i, err)
} }
} }
acc, err = notary.FakeMultisigAccount(m, pubs) acc, err = notary.FakeMultisigAccount(m, pubs)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create fake account for signer #2: %w", err) return nil, fmt.Errorf("create fake account for signer #2: %w", err)
} }
} }
} }
@ -623,7 +611,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
err := multisigAccount.ConvertMultisig(m, ir) err := multisigAccount.ConvertMultisig(m, ir)
if err != nil { if err != nil {
// wrap error as FrostFS-specific since the call is not related to any client // wrap error as FrostFS-specific since the call is not related to any client
return nil, wrapFrostFSError(fmt.Errorf("can't convert account to inner ring multisig wallet: %w", err)) return nil, wrapFrostFSError(fmt.Errorf("convert account to inner ring multisig wallet: %w", err))
} }
} else { } else {
// alphabet multisig redeem script is // alphabet multisig redeem script is
@ -632,7 +620,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
multisigAccount, err = notary.FakeMultisigAccount(m, ir) multisigAccount, err = notary.FakeMultisigAccount(m, ir)
if err != nil { if err != nil {
// wrap error as FrostFS-specific since the call is not related to any client // wrap error as FrostFS-specific since the call is not related to any client
return nil, wrapFrostFSError(fmt.Errorf("can't make inner ring multisig wallet: %w", err)) return nil, wrapFrostFSError(fmt.Errorf("make inner ring multisig wallet: %w", err))
} }
} }
@ -642,7 +630,7 @@ func (c *Client) notaryMultisigAccount(ir []*keys.PublicKey, committee, invokedB
func (c *Client) notaryTxValidationLimit() (uint32, error) { func (c *Client) notaryTxValidationLimit() (uint32, error) {
bc, err := c.rpcActor.GetBlockCount() bc, err := c.rpcActor.GetBlockCount()
if err != nil { if err != nil {
return 0, fmt.Errorf("can't get current blockchain height: %w", err) return 0, fmt.Errorf("get current blockchain height: %w", err)
} }
minTime := bc + c.notary.txValidTime minTime := bc + c.notary.txValidTime
@ -651,24 +639,6 @@ func (c *Client) notaryTxValidationLimit() (uint32, error) {
return rounded, nil return rounded, nil
} }
func (c *Client) depositExpirationOf() (int64, error) {
expirationRes, err := c.TestInvoke(c.notary.notary, notaryExpirationOfMethod, c.acc.PrivateKey().GetScriptHash())
if err != nil {
return 0, fmt.Errorf("test invoke (%s): %w", notaryExpirationOfMethod, err)
}
if len(expirationRes) != 1 {
return 0, fmt.Errorf("method returned unexpected item count: %d", len(expirationRes))
}
currentTillBig, err := expirationRes[0].TryInteger()
if err != nil {
return 0, fmt.Errorf("can't parse deposit till value: %w", err)
}
return currentTillBig.Int64(), nil
}
// sigCount returns the number of required signature. // sigCount returns the number of required signature.
// For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT). // For FrostFS Alphabet M is a 2/3+1 of it (like in dBFT).
// If committee is true, returns M as N/2+1. // If committee is true, returns M as N/2+1.
@ -742,12 +712,12 @@ func alreadyOnChainError(err error) bool {
func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed8, error) { func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed8, error) {
notaryBalance, err := c.GetNotaryDeposit() notaryBalance, err := c.GetNotaryDeposit()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not get notary balance: %w", err) return 0, fmt.Errorf("get notary balance: %w", err)
} }
gasBalance, err := c.GasBalance() gasBalance, err := c.GasBalance()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not get GAS balance: %w", err) return 0, fmt.Errorf("get GAS balance: %w", err)
} }
if gasBalance == 0 { if gasBalance == 0 {
@ -796,12 +766,12 @@ func (c *Client) calculateNonceAndVUB(hash *util.Uint256, roundBlockHeight bool)
if hash != nil { if hash != nil {
height, err = c.getTransactionHeight(*hash) height, err = c.getTransactionHeight(*hash)
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not get transaction height: %w", err) return 0, 0, fmt.Errorf("get transaction height: %w", err)
} }
} else { } else {
height, err = c.rpcActor.GetBlockCount() height, err = c.rpcActor.GetBlockCount()
if err != nil { if err != nil {
return 0, 0, fmt.Errorf("could not get chain height: %w", err) return 0, 0, fmt.Errorf("get chain height: %w", err)
} }
} }

View file

@ -159,7 +159,7 @@ func (s StaticClient) Invoke(ctx context.Context, prm InvokePrm) (InvokeRes, err
nonce, vub, err = s.client.CalculateNonceAndVUB(prm.hash) nonce, vub, err = s.client.CalculateNonceAndVUB(prm.hash)
} }
if err != nil { if err != nil {
return InvokeRes{}, fmt.Errorf("could not calculate nonce and VUB for notary alphabet invoke: %w", err) return InvokeRes{}, fmt.Errorf("calculate nonce and VUB for notary alphabet invoke: %w", err)
} }
vubP = &vub vubP = &vub

View file

@ -53,7 +53,7 @@ func BytesFromStackItem(param stackitem.Item) ([]byte, error) {
case stackitem.IntegerT: case stackitem.IntegerT:
n, err := param.TryInteger() n, err := param.TryInteger()
if err != nil { if err != nil {
return nil, fmt.Errorf("can't parse integer bytes: %w", err) return nil, fmt.Errorf("parse integer bytes: %w", err)
} }
return n.Bytes(), nil return n.Bytes(), nil

View file

@ -111,7 +111,7 @@ type listener struct {
pool *ants.Pool pool *ants.Pool
} }
const newListenerFailMsg = "could not instantiate Listener" const newListenerFailMsg = "instantiate Listener"
var ( var (
errNilLogger = errors.New("nil logger") errNilLogger = errors.New("nil logger")
@ -180,7 +180,7 @@ func (l *listener) subscribe(errCh chan error) {
// fill the list with the contracts with set event parsers. // fill the list with the contracts with set event parsers.
l.mtx.RLock() l.mtx.RLock()
for hashType := range l.notificationParsers { for hashType := range l.notificationParsers {
scHash := hashType.ScriptHash() scHash := hashType.Hash
// prevent repetitions // prevent repetitions
for _, hash := range hashes { for _, hash := range hashes {
@ -189,26 +189,26 @@ func (l *listener) subscribe(errCh chan error) {
} }
} }
hashes = append(hashes, hashType.ScriptHash()) hashes = append(hashes, hashType.Hash)
} }
l.mtx.RUnlock() l.mtx.RUnlock()
err := l.subscriber.SubscribeForNotification(hashes...) err := l.subscriber.SubscribeForNotification(hashes...)
if err != nil { if err != nil {
errCh <- fmt.Errorf("could not subscribe for notifications: %w", err) errCh <- fmt.Errorf("subscribe for notifications: %w", err)
return return
} }
if len(l.blockHandlers) > 0 { if len(l.blockHandlers) > 0 {
if err = l.subscriber.BlockNotifications(); err != nil { if err = l.subscriber.BlockNotifications(); err != nil {
errCh <- fmt.Errorf("could not subscribe for blocks: %w", err) errCh <- fmt.Errorf("subscribe for blocks: %w", err)
return return
} }
} }
if l.listenNotary { if l.listenNotary {
if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil { if err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err) errCh <- fmt.Errorf("subscribe for notary requests: %w", err)
return return
} }
} }
@ -326,9 +326,7 @@ func (l *listener) parseAndHandleNotification(ctx context.Context, notifyEvent *
) )
// get the event parser // get the event parser
keyEvent := scriptHashWithType{} keyEvent := scriptHashWithType{Hash: notifyEvent.ScriptHash, Type: typEvent}
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent)
l.mtx.RLock() l.mtx.RLock()
parser, ok := l.notificationParsers[keyEvent] parser, ok := l.notificationParsers[keyEvent]
@ -451,9 +449,7 @@ func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
l.mtx.Lock() l.mtx.Lock()
defer l.mtx.Unlock() defer l.mtx.Unlock()
var k scriptHashWithType k := scriptHashWithType{Hash: hi.Contract, Type: hi.Type}
k.hash = hi.Contract
k.typ = hi.Type
l.notificationParsers[k] = hi.Parser l.notificationParsers[k] = hi.Parser
l.notificationHandlers[k] = append( l.notificationHandlers[k] = append(
@ -570,7 +566,7 @@ func NewListener(p ListenerParams) (Listener, error) {
// The default capacity is 0, which means "infinite". // The default capacity is 0, which means "infinite".
pool, err := ants.NewPool(p.WorkerPoolCapacity) pool, err := ants.NewPool(p.WorkerPoolCapacity)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err) return nil, fmt.Errorf("init worker pool: %w", err)
} }
return &listener{ return &listener{

View file

@ -10,7 +10,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
) )
var errNilPubKey = errors.New("could not parse public key: public key is nil") var errNilPubKey = errors.New("public key is nil")
func (s *UpdatePeer) setPublicKey(v []byte) (err error) { func (s *UpdatePeer) setPublicKey(v []byte) (err error) {
if v == nil { if v == nil {
@ -19,7 +19,7 @@ func (s *UpdatePeer) setPublicKey(v []byte) (err error) {
s.PubKey, err = keys.NewPublicKeyFromBytes(v, elliptic.P256()) s.PubKey, err = keys.NewPublicKeyFromBytes(v, elliptic.P256())
if err != nil { if err != nil {
return fmt.Errorf("could not parse public key: %w", err) return fmt.Errorf("parse public key: %w", err)
} }
return return

View file

@ -127,7 +127,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
for { for {
opCode, param, err = ctx.Next() opCode, param, err = ctx.Next()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get next opcode in script: %w", err) return nil, fmt.Errorf("get next opcode in script: %w", err)
} }
if opCode == opcode.RET { if opCode == opcode.RET {
@ -147,7 +147,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
// retrieve contract's script hash // retrieve contract's script hash
contractHash, err := util.Uint160DecodeBytesBE(ops[opsLen-2].param) contractHash, err := util.Uint160DecodeBytesBE(ops[opsLen-2].param)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not decode contract hash: %w", err) return nil, fmt.Errorf("decode contract hash: %w", err)
} }
// retrieve contract's method // retrieve contract's method
@ -164,7 +164,7 @@ func (p Preparator) Prepare(nr *payload.P2PNotaryRequest) (NotaryEvent, error) {
if len(args) != 0 { if len(args) != 0 {
err = p.validateParameterOpcodes(args) err = p.validateParameterOpcodes(args)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not validate arguments: %w", err) return nil, fmt.Errorf("validate arguments: %w", err)
} }
// without args packing opcodes // without args packing opcodes
@ -206,7 +206,7 @@ func (p Preparator) validateNotaryRequest(nr *payload.P2PNotaryRequest) error {
currentAlphabet, err := p.alphaKeys() currentAlphabet, err := p.alphaKeys()
if err != nil { if err != nil {
return fmt.Errorf("could not fetch Alphabet public keys: %w", err) return fmt.Errorf("fetch Alphabet public keys: %w", err)
} }
err = p.validateCosigners(ln, nr.MainTransaction.Signers, currentAlphabet) err = p.validateCosigners(ln, nr.MainTransaction.Signers, currentAlphabet)
@ -239,7 +239,7 @@ func (p Preparator) validateParameterOpcodes(ops []Op) error {
argsLen, err := IntFromOpcode(ops[l-2]) argsLen, err := IntFromOpcode(ops[l-2])
if err != nil { if err != nil {
return fmt.Errorf("could not parse argument len: %w", err) return fmt.Errorf("parse argument len: %w", err)
} }
err = validateNestedArgs(argsLen, ops[:l-2]) err = validateNestedArgs(argsLen, ops[:l-2])
@ -273,7 +273,7 @@ func validateNestedArgs(expArgLen int64, ops []Op) error {
argsLen, err := IntFromOpcode(ops[i-1]) argsLen, err := IntFromOpcode(ops[i-1])
if err != nil { if err != nil {
return fmt.Errorf("could not parse argument len: %w", err) return fmt.Errorf("parse argument len: %w", err)
} }
expArgLen += argsLen + 1 expArgLen += argsLen + 1
@ -307,7 +307,7 @@ func (p Preparator) validateExpiration(fbTX *transaction.Transaction) error {
currBlock, err := p.blockCounter.BlockCount() currBlock, err := p.blockCounter.BlockCount()
if err != nil { if err != nil {
return fmt.Errorf("could not fetch current chain height: %w", err) return fmt.Errorf("fetch current chain height: %w", err)
} }
if currBlock >= nvb.Height { if currBlock >= nvb.Height {
@ -327,7 +327,7 @@ func (p Preparator) validateCosigners(expected int, s []transaction.Signer, alph
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys) alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
if err != nil { if err != nil {
return fmt.Errorf("could not get Alphabet verification script: %w", err) return fmt.Errorf("get Alphabet verification script: %w", err)
} }
if !s[1].Account.Equals(hash.Hash160(alphaVerificationScript)) { if !s[1].Account.Equals(hash.Hash160(alphaVerificationScript)) {
@ -346,7 +346,7 @@ func (p Preparator) validateWitnesses(w []transaction.Witness, alphaKeys keys.Pu
alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys) alphaVerificationScript, err := smartcontract.CreateMultiSigRedeemScript(len(alphaKeys)*2/3+1, alphaKeys)
if err != nil { if err != nil {
return fmt.Errorf("could not get Alphabet verification script: %w", err) return fmt.Errorf("get Alphabet verification script: %w", err)
} }
// the second one must be witness of the current // the second one must be witness of the current

View file

@ -26,7 +26,7 @@ func (Designate) MorphEvent() {}
func ParseDesignate(e *state.ContainedNotificationEvent) (event.Event, error) { func ParseDesignate(e *state.ContainedNotificationEvent) (event.Event, error) {
params, err := event.ParseStackArray(e) params, err := event.ParseStackArray(e)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse stack items from notify event: %w", err) return nil, fmt.Errorf("parse stack items from notify event: %w", err)
} }
if len(params) != 2 { if len(params) != 2 {

View file

@ -20,13 +20,9 @@ type scriptHashValue struct {
hash util.Uint160 hash util.Uint160
} }
type typeValue struct {
typ Type
}
type scriptHashWithType struct { type scriptHashWithType struct {
scriptHashValue Hash util.Uint160
typeValue Type Type
} }
type notaryRequestTypes struct { type notaryRequestTypes struct {
@ -73,16 +69,6 @@ func (s scriptHashValue) ScriptHash() util.Uint160 {
return s.hash return s.hash
} }
// SetType is an event type setter.
func (s *typeValue) SetType(v Type) {
s.typ = v
}
// GetType is an event type getter.
func (s typeValue) GetType() Type {
return s.typ
}
// WorkerPoolHandler sets closure over worker pool w with passed handler h. // WorkerPoolHandler sets closure over worker pool w with passed handler h.
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler { func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *logger.Logger) Handler {
return func(ctx context.Context, e Event) { return func(ctx context.Context, e Event) {

View file

@ -80,3 +80,26 @@ func (s *Server) List(ctx context.Context, req *containerGRPC.ListRequest) (*con
return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil return resp.ToGRPCMessage().(*containerGRPC.ListResponse), nil
} }
type containerStreamerV2 struct {
containerGRPC.ContainerService_ListStreamServer
}
func (s *containerStreamerV2) Send(resp *container.ListStreamResponse) error {
return s.ContainerService_ListStreamServer.Send(
resp.ToGRPCMessage().(*containerGRPC.ListStreamResponse),
)
}
// ListStream converts gRPC ListRequest message and server-side stream and overtakes its data
// to gRPC stream.
func (s *Server) ListStream(req *containerGRPC.ListStreamRequest, gStream containerGRPC.ContainerService_ListStreamServer) error {
listReq := new(container.ListStreamRequest)
if err := listReq.FromGRPCMessage(req); err != nil {
return err
}
return s.srv.ListStream(listReq, &containerStreamerV2{
ContainerService_ListStreamServer: gStream,
})
}

View file

@ -175,6 +175,79 @@ func (ac *apeChecker) List(ctx context.Context, req *container.ListRequest) (*co
return nil, apeErr(nativeschema.MethodListContainers, s) return nil, apeErr(nativeschema.MethodListContainers, s)
} }
func (ac *apeChecker) ListStream(req *container.ListStreamRequest, stream ListStream) error {
ctx, span := tracing.StartSpanFromContext(stream.Context(), "apeChecker.ListStream")
defer span.End()
role, pk, err := ac.getRoleWithoutContainerID(req.GetBody().GetOwnerID(), req.GetMetaHeader(), req.GetVerificationHeader())
if err != nil {
return err
}
reqProps := map[string]string{
nativeschema.PropertyKeyActorPublicKey: hex.EncodeToString(pk.Bytes()),
nativeschema.PropertyKeyActorRole: role,
}
reqProps, err = ac.fillWithUserClaimTags(reqProps, pk)
if err != nil {
return err
}
if p, ok := peer.FromContext(ctx); ok {
if tcpAddr, ok := p.Addr.(*net.TCPAddr); ok {
reqProps[commonschema.PropertyKeyFrostFSSourceIP] = tcpAddr.IP.String()
}
}
namespace, err := ac.namespaceByOwner(req.GetBody().GetOwnerID())
if err != nil {
return fmt.Errorf("could not get owner namespace: %w", err)
}
if err := ac.validateNamespaceByPublicKey(pk, namespace); err != nil {
return err
}
request := aperequest.NewRequest(
nativeschema.MethodListContainers,
aperequest.NewResource(
resourceName(namespace, ""),
make(map[string]string),
),
reqProps,
)
groups, err := aperequest.Groups(ac.frostFSIDClient, pk)
if err != nil {
return fmt.Errorf("failed to get group ids: %w", err)
}
// Policy contract keeps group related chains as namespace-group pair.
for i := range groups {
groups[i] = fmt.Sprintf("%s:%s", namespace, groups[i])
}
rt := policyengine.NewRequestTargetWithNamespace(namespace)
rt.User = &policyengine.Target{
Type: policyengine.User,
Name: fmt.Sprintf("%s:%s", namespace, pk.Address()),
}
rt.Groups = make([]policyengine.Target, len(groups))
for i := range groups {
rt.Groups[i] = policyengine.GroupTarget(groups[i])
}
s, found, err := ac.router.IsAllowed(apechain.Ingress, rt, request)
if err != nil {
return err
}
if found && s == apechain.Allow {
return ac.next.ListStream(req, stream)
}
return apeErr(nativeschema.MethodListContainers, s)
}
func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { func (ac *apeChecker) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put") ctx, span := tracing.StartSpanFromContext(ctx, "apeChecker.Put")
defer span.End() defer span.End()

View file

@ -1079,6 +1079,11 @@ func (s *srvStub) List(context.Context, *container.ListRequest) (*container.List
return &container.ListResponse{}, nil return &container.ListResponse{}, nil
} }
func (s *srvStub) ListStream(*container.ListStreamRequest, ListStream) error {
s.calls["ListStream"]++
return nil
}
func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) { func (s *srvStub) Put(context.Context, *container.PutRequest) (*container.PutResponse, error) {
s.calls["Put"]++ s.calls["Put"]++
return &container.PutResponse{}, nil return &container.PutResponse{}, nil

View file

@ -63,6 +63,17 @@ func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*c
return res, err return res, err
} }
// ListStream implements Server.
func (a *auditService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := a.next.ListStream(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(stream.Context(), a.log, container_grpc.ContainerService_ListStream_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil)
return err
}
// Put implements Server. // Put implements Server.
func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
res, err := a.next.Put(ctx, req) res, err := a.next.Put(ctx, req)

View file

@ -14,6 +14,7 @@ type ServiceExecutor interface {
Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error) Delete(context.Context, *session.Token, *container.DeleteRequestBody) (*container.DeleteResponseBody, error)
Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error) Get(context.Context, *container.GetRequestBody) (*container.GetResponseBody, error)
List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error) List(context.Context, *container.ListRequestBody) (*container.ListResponseBody, error)
ListStream(context.Context, *container.ListStreamRequest, ListStream) error
} }
type executorSvc struct { type executorSvc struct {
@ -93,3 +94,11 @@ func (s *executorSvc) List(ctx context.Context, req *container.ListRequest) (*co
s.respSvc.SetMeta(resp) s.respSvc.SetMeta(resp)
return resp, nil return resp, nil
} }
func (s *executorSvc) ListStream(req *container.ListStreamRequest, stream ListStream) error {
err := s.exec.ListStream(stream.Context(), req, stream)
if err != nil {
return fmt.Errorf("could not execute ListStream request: %w", err)
}
return nil
}

View file

@ -200,3 +200,35 @@ func (s *morphExecutor) List(_ context.Context, body *container.ListRequestBody)
return res, nil return res, nil
} }
func (s *morphExecutor) ListStream(_ context.Context, req *container.ListStreamRequest, stream containerSvc.ListStream) error {
body := req.GetBody()
idV2 := body.GetOwnerID()
if idV2 == nil {
return errMissingUserID
}
var id user.ID
err := id.ReadFromV2(*idV2)
if err != nil {
return fmt.Errorf("invalid user ID: %w", err)
}
cnrs, err := s.rdr.ContainersOf(&id)
if err != nil {
return err
}
cidList := make([]refs.ContainerID, len(cnrs))
for i := range cnrs {
cnrs[i].WriteToV2(&cidList[i])
}
resBody := new(container.ListStreamResponseBody)
resBody.SetContainerIDs(cidList)
r := new(container.ListStreamResponse)
r.SetBody(resBody)
return stream.Send(r)
}

View file

@ -3,6 +3,7 @@ package container
import ( import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
) )
@ -12,4 +13,11 @@ type Server interface {
Get(context.Context, *container.GetRequest) (*container.GetResponse, error) Get(context.Context, *container.GetRequest) (*container.GetResponse, error)
Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error) Delete(context.Context, *container.DeleteRequest) (*container.DeleteResponse, error)
List(context.Context, *container.ListRequest) (*container.ListResponse, error) List(context.Context, *container.ListRequest) (*container.ListResponse, error)
ListStream(*container.ListStreamRequest, ListStream) error
}
// ListStream is an interface of FrostFS API v2 compatible search streamer.
type ListStream interface {
util.ServerStream
Send(*container.ListStreamResponse) error
} }

View file

@ -56,3 +56,40 @@ func (s *signService) List(ctx context.Context, req *container.ListRequest) (*co
resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req)) resp, err := util.EnsureNonNilResponse(s.svc.List(ctx, req))
return resp, s.sigSvc.SignResponse(resp, err) return resp, s.sigSvc.SignResponse(resp, err)
} }
func (s *signService) ListStream(req *container.ListStreamRequest, stream ListStream) error {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(container.ListStreamResponse)
_ = s.sigSvc.SignResponse(resp, err)
return stream.Send(resp)
}
ss := &listStreamSigner{
ListStream: stream,
sigSvc: s.sigSvc,
}
err := s.svc.ListStream(req, ss)
if err != nil || !ss.nonEmptyResp {
return ss.send(new(container.ListStreamResponse), err)
}
return nil
}
type listStreamSigner struct {
ListStream
sigSvc *util.SignService
nonEmptyResp bool // set on first Send call
}
func (s *listStreamSigner) Send(resp *container.ListStreamResponse) error {
s.nonEmptyResp = true
return s.send(resp, nil)
}
func (s *listStreamSigner) send(resp *container.ListStreamResponse, err error) error {
if err := s.sigSvc.SignResponse(resp, err); err != nil {
return err
}
return s.ListStream.Send(resp)
}

View file

@ -0,0 +1,92 @@
package container
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
)
type (
TransportSplitter struct {
next Server
respSvc *response.Service
cnrAmount uint32
}
listStreamMsgSizeCtrl struct {
util.ServerStream
stream ListStream
respSvc *response.Service
cnrAmount uint32
}
)
func NewSplitterService(cnrAmount uint32, respSvc *response.Service, next Server) Server {
return &TransportSplitter{
next: next,
respSvc: respSvc,
cnrAmount: cnrAmount,
}
}
func (s *TransportSplitter) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) {
return s.next.Put(ctx, req)
}
func (s *TransportSplitter) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) {
return s.next.Delete(ctx, req)
}
func (s *TransportSplitter) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) {
return s.next.Get(ctx, req)
}
func (s *TransportSplitter) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) {
return s.next.List(ctx, req)
}
func (s *TransportSplitter) ListStream(req *container.ListStreamRequest, stream ListStream) error {
return s.next.ListStream(req, &listStreamMsgSizeCtrl{
ServerStream: stream,
stream: stream,
respSvc: s.respSvc,
cnrAmount: s.cnrAmount,
})
}
func (s *listStreamMsgSizeCtrl) Send(resp *container.ListStreamResponse) error {
s.respSvc.SetMeta(resp)
body := resp.GetBody()
ids := body.GetContainerIDs()
var newResp *container.ListStreamResponse
for {
if newResp == nil {
newResp = new(container.ListStreamResponse)
newResp.SetBody(body)
}
cut := min(s.cnrAmount, uint32(len(ids)))
body.SetContainerIDs(ids[:cut])
newResp.SetMetaHeader(resp.GetMetaHeader())
newResp.SetVerificationHeader(resp.GetVerificationHeader())
if err := s.stream.Send(newResp); err != nil {
return fmt.Errorf("TransportSplitter: %w", err)
}
ids = ids[cut:]
if len(ids) == 0 {
break
}
}
return nil
}