Compare commits

..

2 commits

Author SHA1 Message Date
Pavel Karpy
543d1dca85 [#73] morph: Rename vars that collide with package names
Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-02-22 19:28:36 +03:00
Pavel Karpy
8579ed4fff [#59] morph: Adopt updated neo-go client API for subs
It does not use deprecated methods anymore but also adds more code that
removes. Future refactor that will affect more components will optimize
usage of the updated API.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
2023-02-22 19:28:33 +03:00
24 changed files with 302 additions and 212 deletions

2
.github/CODEOWNERS vendored
View file

@ -1 +1 @@
* @TrueCloudLab/storage-core @TrueCloudLab/committers
* @carpawell @fyrchik @acid-ant

View file

@ -12,7 +12,6 @@ Changelog for FrostFS Node
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
- Multiple configs support (#44)
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
@ -54,7 +53,6 @@ Changelog for FrostFS Node
- `golang.org/x/term` to `v0.3.0`
- `google.golang.org/grpc` to `v1.51.0`
- `github.com/nats-io/nats.go` to `v1.22.1`
- `github.com/TrueCloudLab/hrw` to `v.1.1.1`
- Minimum go version to v1.18
### Updating from v0.35.0

View file

@ -141,13 +141,13 @@ func setConfigCmd(cmd *cobra.Command, args []string) error {
}
func parseConfigPair(kvStr string, force bool) (key string, val any, err error) {
k, v, found := strings.Cut(kvStr, "=")
if !found {
kv := strings.SplitN(kvStr, "=", 2)
if len(kv) != 2 {
return "", nil, fmt.Errorf("invalid parameter format: must be 'key=val', got: %s", kvStr)
}
key = k
valRaw := v
key = kv[0]
valRaw := kv[1]
switch key {
case netmapAuditFeeKey, netmapBasicIncomeRateKey,
@ -162,7 +162,7 @@ func parseConfigPair(kvStr string, force bool) (key string, val any, err error)
case netmapEigenTrustAlphaKey:
// just check that it could
// be parsed correctly
_, err = strconv.ParseFloat(v, 64)
_, err = strconv.ParseFloat(kv[1], 64)
if err != nil {
err = fmt.Errorf("could not parse %s's value '%s' as float: %w", key, valRaw, err)
}

View file

@ -27,23 +27,23 @@ func setPolicyCmd(cmd *cobra.Command, args []string) error {
bw := io.NewBufBinWriter()
for i := range args {
k, v, found := strings.Cut(args[i], "=")
if !found {
kv := strings.SplitN(args[i], "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid parameter format, must be Parameter=Value")
}
switch k {
switch kv[0] {
case execFeeParam, storagePriceParam, setFeeParam:
default:
return fmt.Errorf("parameter must be one of %s, %s and %s", execFeeParam, storagePriceParam, setFeeParam)
}
value, err := strconv.ParseUint(v, 10, 32)
value, err := strconv.ParseUint(kv[1], 10, 32)
if err != nil {
return fmt.Errorf("can't parse parameter value '%s': %w", args[1], err)
}
emit.AppCall(bw.BinWriter, policy.Hash, "set"+k, callflag.All, int64(value))
emit.AppCall(bw.BinWriter, policy.Hash, "set"+kv[0], callflag.All, int64(value))
}
if err := wCtx.sendCommitteeTx(bw.Bytes(), false); err != nil {

View file

@ -7,7 +7,6 @@ import (
"strings"
"time"
containerApi "github.com/TrueCloudLab/frostfs-api-go/v2/container"
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
@ -27,8 +26,6 @@ var (
containerAttributes []string
containerAwait bool
containerName string
containerNnsName string
containerNnsZone string
containerNoTimestamp bool
containerSubnet string
force bool
@ -163,8 +160,6 @@ func initContainerCreateCmd() {
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
flags.StringVar(&containerName, "name", "", "Container name attribute")
flags.StringVar(&containerNnsName, "nns-name", "", "Container nns name attribute")
flags.StringVar(&containerNnsZone, "nns-zone", "", "Container nns zone attribute")
flags.BoolVar(&containerNoTimestamp, "disable-timestamp", false, "Disable timestamp container attribute")
flags.StringVar(&containerSubnet, "subnet", "", "String representation of container subnetwork")
flags.BoolVarP(&force, commonflags.ForceFlag, commonflags.ForceFlagShorthand, false,
@ -202,12 +197,12 @@ func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.Plac
func parseAttributes(dst *container.Container, attributes []string) error {
for i := range attributes {
k, v, found := strings.Cut(attributes[i], attributeDelimiter)
if !found {
kvPair := strings.Split(attributes[i], attributeDelimiter)
if len(kvPair) != 2 {
return errors.New("invalid container attribute")
}
dst.SetAttribute(k, v)
dst.SetAttribute(kvPair[0], kvPair[1])
}
if !containerNoTimestamp {
@ -218,12 +213,5 @@ func parseAttributes(dst *container.Container, attributes []string) error {
container.SetName(dst, containerName)
}
if containerNnsName != "" {
dst.SetAttribute(containerApi.SysAttributeName, containerNnsName)
}
if containerNnsZone != "" {
dst.SetAttribute(containerApi.SysAttributeZone, containerNnsZone)
}
return nil
}

View file

@ -179,12 +179,12 @@ func parseObjectAttrs(cmd *cobra.Command) ([]object.Attribute, error) {
attrs := make([]object.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
for i := range rawAttrs {
k, v, found := strings.Cut(rawAttrs[i], "=")
if !found {
kv := strings.SplitN(rawAttrs[i], "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
}
attrs[i].SetKey(k)
attrs[i].SetValue(v)
attrs[i].SetKey(kv[0])
attrs[i].SetValue(kv[1])
}
disableFilename, _ := cmd.Flags().GetBool("disable-filename")
@ -218,26 +218,26 @@ func parseObjectNotifications(cmd *cobra.Command) (*object.NotificationInfo, err
return nil, nil
}
before, after, found := strings.Cut(raw, separator)
if !found {
rawSlice := strings.SplitN(raw, separator, 2)
if len(rawSlice) != 2 {
return nil, fmt.Errorf("notification must be in the form of: *epoch*%s*topic*, got %s", separator, raw)
}
ni := new(object.NotificationInfo)
epoch, err := strconv.ParseUint(before, 10, 64)
epoch, err := strconv.ParseUint(rawSlice[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("could not parse notification epoch %s: %w", before, err)
return nil, fmt.Errorf("could not parse notification epoch %s: %w", rawSlice[0], err)
}
ni.SetEpoch(epoch)
if after == "" {
if rawSlice[1] == "" {
return nil, fmt.Errorf("incorrect empty topic: use %s to force using default topic", useDefaultTopic)
}
if after != useDefaultTopic {
ni.SetTopic(after)
if rawSlice[1] != useDefaultTopic {
ni.SetTopic(rawSlice[1])
}
return ni, nil

View file

@ -154,16 +154,16 @@ func getRangeList(cmd *cobra.Command) ([]*object.Range, error) {
vs := strings.Split(v, ",")
rs := make([]*object.Range, len(vs))
for i := range vs {
before, after, found := strings.Cut(vs[i], rangeSep)
if !found {
r := strings.Split(vs[i], rangeSep)
if len(r) != 2 {
return nil, fmt.Errorf("invalid range specifier: %s", vs[i])
}
offset, err := strconv.ParseUint(before, 10, 64)
offset, err := strconv.ParseUint(r[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", vs[i], err)
}
length, err := strconv.ParseUint(after, 10, 64)
length, err := strconv.ParseUint(r[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", vs[i], err)
}

View file

@ -63,12 +63,12 @@ func parseXHeaders(cmd *cobra.Command) []string {
xs := make([]string, 0, 2*len(xHeaders))
for i := range xHeaders {
k, v, found := strings.Cut(xHeaders[i], "=")
if !found {
kv := strings.SplitN(xHeaders[i], "=", 2)
if len(kv) != 2 {
panic(fmt.Errorf("invalid X-Header format: %s", xHeaders[i]))
}
xs = append(xs, k, v)
xs = append(xs, kv[0], kv[1])
}
return xs

View file

@ -79,14 +79,14 @@ func parseMeta(cmd *cobra.Command) ([]*tree.KeyValue, error) {
pairs := make([]*tree.KeyValue, 0, len(raws))
for i := range raws {
k, v, found := strings.Cut(raws[i], "=")
if !found {
kv := strings.SplitN(raws[i], "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid meta pair format: %s", raws[i])
}
var pair tree.KeyValue
pair.Key = k
pair.Value = []byte(v)
pair.Key = kv[0]
pair.Value = []byte(kv[1])
pairs = append(pairs, &pair)
}

View file

@ -226,32 +226,35 @@ func parseEACLTable(tb *eacl.Table, args []string) error {
func parseEACLRecord(args []string) (*eacl.Record, error) {
r := new(eacl.Record)
for _, arg := range args {
before, after, found := strings.Cut(arg, ":")
for i := range args {
ss := strings.SplitN(args[i], ":", 2)
switch prefix := strings.ToLower(before); prefix {
switch prefix := strings.ToLower(ss[0]); prefix {
case "req", "obj": // filters
if !found {
return nil, fmt.Errorf("invalid filter or target: %s", arg)
if len(ss) != 2 {
return nil, fmt.Errorf("invalid filter or target: %s", args[i])
}
i := strings.Index(ss[1], "=")
if i < 0 {
return nil, fmt.Errorf("invalid filter key-value pair: %s", ss[1])
}
var key, value string
var op eacl.Match
var f bool
key, value, f = strings.Cut(after, "!=")
if f {
if 0 < i && ss[1][i-1] == '!' {
key = ss[1][:i-1]
op = eacl.MatchStringNotEqual
} else {
key, value, f = strings.Cut(after, "=")
if !f {
return nil, fmt.Errorf("invalid filter key-value pair: %s", after)
}
key = ss[1][:i]
op = eacl.MatchStringEqual
}
value = ss[1][i+1:]
typ := eacl.HeaderFromRequest
if before == "obj" {
if ss[0] == "obj" {
typ = eacl.HeaderFromObject
}
@ -260,8 +263,8 @@ func parseEACLRecord(args []string) (*eacl.Record, error) {
var err error
var pubs []ecdsa.PublicKey
if found {
pubs, err = parseKeyList(after)
if len(ss) == 2 {
pubs, err = parseKeyList(ss[1])
if err != nil {
return nil, err
}
@ -278,7 +281,7 @@ func parseEACLRecord(args []string) (*eacl.Record, error) {
eacl.AddFormedTarget(r, role, pubs...)
default:
return nil, fmt.Errorf("invalid prefix: %s", before)
return nil, fmt.Errorf("invalid prefix: %s", ss[0])
}
}

View file

@ -65,14 +65,14 @@ func loadEnv(path string) {
scanner := bufio.NewScanner(f)
for scanner.Scan() {
k, v, found := strings.Cut(scanner.Text(), "=")
if !found {
pair := strings.SplitN(scanner.Text(), "=", 2)
if len(pair) != 2 {
continue
}
v = strings.Trim(v, `"`)
pair[1] = strings.Trim(pair[1], `"`)
err = os.Setenv(k, v)
err = os.Setenv(pair[0], pair[1])
if err != nil {
panic("can't set environment variable")
}

2
go.mod
View file

@ -6,7 +6,7 @@ require (
github.com/TrueCloudLab/frostfs-api-go/v2 v2.0.0-20221212144048-1351b6656d68
github.com/TrueCloudLab/frostfs-contract v0.0.0-20221213081248-6c805c1b4e42
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52
github.com/TrueCloudLab/hrw v1.1.0
github.com/TrueCloudLab/tzhash v1.7.0
github.com/cheggaaa/pb v1.0.29
github.com/chzyer/readline v1.5.1

2
go.sum
View file

@ -52,8 +52,6 @@ github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556 h1:Cc1
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556/go.mod h1:4ZiG4jNLzrqeJbmZUrPI7wDZhQVPaf0zEIWa/eBsqBg=
github.com/TrueCloudLab/hrw v1.1.0 h1:2U69PpUX1UtMWgh/RAg6D8mQW+/WsxbLNE+19EUhLhY=
github.com/TrueCloudLab/hrw v1.1.0/go.mod h1:Pzi8Hy3qx12cew+ajVxgbtDVM4sRG9/gJnJLcL/yRyY=
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52 h1:fBeG0EkL7Pa2D0SIiZt3yQYGpP/IvrXg4xEPAZ4Jjys=
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52/go.mod h1:BG6NztCuNc0UFr6MWJ4MM1sUl9lxx6PBRwLmTxdre20=
github.com/TrueCloudLab/rfc6979 v0.3.0 h1:0SYMAfQWh/TjnofqYQHy+s3rmQ5gi0fvOaDbqd60/Ic=
github.com/TrueCloudLab/rfc6979 v0.3.0/go.mod h1:qylxFXFQ/sMvpZC/8JyWp+mfzk5Zj/KDT5FAbekhobc=
github.com/TrueCloudLab/tzhash v1.7.0 h1:btGORepc7Dg+n4MxgJxv73c9eYhwSBI5HqsqUBRmJiw=

View file

@ -73,18 +73,18 @@ func stringifyAddress(addr oid.Address) string {
}
func addressFromString(s string) (oid.Address, error) {
before, after, found := strings.Cut(s, ".")
if !found {
i := strings.IndexByte(s, '.')
if i == -1 {
return oid.Address{}, errors.New("invalid address")
}
var obj oid.ID
if err := obj.DecodeString(before); err != nil {
if err := obj.DecodeString(s[:i]); err != nil {
return oid.Address{}, err
}
var cnr cid.ID
if err := cnr.DecodeString(after); err != nil {
if err := cnr.DecodeString(s[i+1:]); err != nil {
return oid.Address{}, err
}

View file

@ -19,7 +19,7 @@ type StorageEngine struct {
mtx *sync.RWMutex
shards map[string]hashedShard
shards map[string]shardWrapper
shardPools map[string]util.WorkerPool
@ -223,7 +223,7 @@ func New(opts ...Option) *StorageEngine {
return &StorageEngine{
cfg: c,
mtx: new(sync.RWMutex),
shards: make(map[string]hashedShard),
shards: make(map[string]shardWrapper),
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),

View file

@ -21,7 +21,6 @@ import (
oidtest "github.com/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "github.com/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/TrueCloudLab/frostfs-sdk-go/version"
"github.com/TrueCloudLab/hrw"
"github.com/TrueCloudLab/tzhash/tz"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
@ -87,12 +86,9 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
panic(err)
}
engine.shards[s.ID().String()] = hashedShard{
shardWrapper: shardWrapper{
engine.shards[s.ID().String()] = shardWrapper{
errorCount: atomic.NewUint32(0),
Shard: s,
},
hash: hrw.Hash([]byte(s.ID().String())),
}
engine.shardPools[s.ID().String()] = pool
}

View file

@ -151,7 +151,7 @@ mainLoop:
return res, err
}
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue

View file

@ -16,10 +16,7 @@ import (
var errShardNotFound = logicerr.New("shard not found")
type hashedShard struct {
shardWrapper
hash uint64
}
type hashedShard shardWrapper
type metricsWithID struct {
id string
@ -130,12 +127,9 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
return fmt.Errorf("shard with id %s was already added", strID)
}
e.shards[strID] = hashedShard{
shardWrapper: shardWrapper{
e.shards[strID] = shardWrapper{
errorCount: atomic.NewUint32(0),
Shard: sh,
},
hash: hrw.Hash([]byte(strID)),
}
e.shardPools[strID] = pool
@ -150,7 +144,7 @@ func (e *StorageEngine) removeShards(ids ...string) {
return
}
ss := make([]hashedShard, 0, len(ids))
ss := make([]shardWrapper, 0, len(ids))
e.mtx.Lock()
for _, id := range ids {
@ -216,7 +210,7 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
weights = append(weights, e.shardWeight(sh.Shard))
}
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
return shards
}
@ -282,5 +276,7 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
}
func (s hashedShard) Hash() uint64 {
return s.hash
return hrw.Hash(
[]byte(s.Shard.ID().String()),
)
}

View file

@ -10,10 +10,13 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
@ -69,17 +72,20 @@ type Client struct {
// on every normal call.
switchLock *sync.RWMutex
// channel for ws notifications
// channels for ws notifications; protected with switchLock
notifications chan rpcclient.Notification
// channel for internal stop
closeChan chan struct{}
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
// cached subscription information
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
subscribedToNewBlocks bool
// channel for internal stop
closeChan chan struct{}
// indicates that Client is not able to
// establish connection to any of the
// provided RPC endpoints
@ -385,43 +391,43 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
//
// Wraps any error to frostfsError.
func toStackParameter(value any) (sc.Parameter, error) {
var result = sc.Parameter{
var res = sc.Parameter{
Value: value,
}
switch v := value.(type) {
case []byte:
result.Type = sc.ByteArrayType
res.Type = sc.ByteArrayType
case int:
result.Type = sc.IntegerType
result.Value = big.NewInt(int64(v))
res.Type = sc.IntegerType
res.Value = big.NewInt(int64(v))
case int64:
result.Type = sc.IntegerType
result.Value = big.NewInt(v)
res.Type = sc.IntegerType
res.Value = big.NewInt(v)
case uint64:
result.Type = sc.IntegerType
result.Value = new(big.Int).SetUint64(v)
res.Type = sc.IntegerType
res.Value = new(big.Int).SetUint64(v)
case [][]byte:
arr := make([]sc.Parameter, 0, len(v))
for i := range v {
elem, err := toStackParameter(v[i])
if err != nil {
return result, err
return res, err
}
arr = append(arr, elem)
}
result.Type = sc.ArrayType
result.Value = arr
res.Type = sc.ArrayType
res.Value = arr
case string:
result.Type = sc.StringType
res.Type = sc.StringType
case util.Uint160:
result.Type = sc.ByteArrayType
result.Value = v.BytesBE()
res.Type = sc.ByteArrayType
res.Value = v.BytesBE()
case noderoles.Role:
result.Type = sc.IntegerType
result.Value = big.NewInt(int64(v))
res.Type = sc.IntegerType
res.Value = big.NewInt(int64(v))
case keys.PublicKeys:
arr := make([][]byte, 0, len(v))
for i := range v {
@ -430,13 +436,13 @@ func toStackParameter(value any) (sc.Parameter, error) {
return toStackParameter(arr)
case bool:
result.Type = sc.BoolType
result.Value = v
res.Type = sc.BoolType
res.Value = v
default:
return result, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
}
return result, nil
return res, nil
}
// MagicNumber returns the magic number of the network
@ -480,7 +486,7 @@ func (c *Client) MsPerBlock() (res int64, err error) {
}
// IsValidScript returns true if invocation script executes with HALT state.
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) {
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
@ -488,12 +494,12 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res
return false, ErrConnectionLost
}
result, err := c.client.InvokeScript(script, signers)
res, err := c.client.InvokeScript(script, signers)
if err != nil {
return false, fmt.Errorf("invokeScript: %w", err)
}
return result.State == vmstate.Halt.String(), nil
return res.State == vmstate.Halt.String(), nil
}
// NotificationChannel returns channel than receives subscribed
@ -525,3 +531,14 @@ func (c *Client) setActor(act *actor.Actor) {
c.gasToken = nep17.New(act, gas.Hash)
c.rolemgmt = rolemgmt.New(act)
}
// updateSubs updates subscription information, must be
// protected with switchLock.
func (c *Client) updateSubs(si subsInfo) {
c.blockRcv = si.blockRcv
c.notificationRcv = si.notificationRcv
c.notaryReqRcv = si.notaryReqRcv
c.subscribedEvents = si.subscribedEvents
c.subscribedNotaryEvents = si.subscribedNotaryEvents
}

View file

@ -9,8 +9,11 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -109,6 +112,9 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
cfg: *cfg,
switchLock: &sync.RWMutex{},
notifications: make(chan rpcclient.Notification),
blockRcv: make(chan *block.Block),
notificationRcv: make(chan *state.ContainedNotificationEvent),
notaryReqRcv: make(chan *result.NotaryRequestEvent),
subscribedEvents: make(map[util.Uint160]string),
subscribedNotaryEvents: make(map[util.Uint160]string),
closeChan: make(chan struct{}),

View file

@ -4,6 +4,11 @@ import (
"sort"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"go.uber.org/zap"
)
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
c.logger.Info("connection to the new RPC node has been established",
zap.String("endpoint", newEndpoint))
if !c.restoreSubscriptions(cli, newEndpoint) {
subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
if !ok {
// new WS client does not allow
// restoring subscription, client
// could not work correctly =>
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
c.client = cli
c.setActor(act)
c.updateSubs(subs)
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
}
func (c *Client) notificationLoop() {
var e any
var ok bool
for {
c.switchLock.RLock()
nChan := c.client.Notifications
bChan := c.blockRcv
nChan := c.notificationRcv
nrChan := c.notaryReqRcv
c.switchLock.RUnlock()
select {
@ -93,20 +105,55 @@ func (c *Client) notificationLoop() {
c.close()
return
case n, ok := <-nChan:
// notification channel is used as a connection
// state: if it is closed, the connection is
// considered to be lost
if !ok {
case e, ok = <-bChan:
case e, ok = <-nChan:
case e, ok = <-nrChan:
}
if ok {
c.routeEvent(e)
continue
}
if !c.reconnect() {
return
}
}
}
func (c *Client) routeEvent(e any) {
typedNotification := rpcclient.Notification{Value: e}
switch e.(type) {
case *block.Block:
typedNotification.Type = neorpc.BlockEventID
case *state.ContainedNotificationEvent:
typedNotification.Type = neorpc.NotificationEventID
case *result.NotaryRequestEvent:
typedNotification.Type = neorpc.NotaryRequestEventID
}
select {
case c.notifications <- typedNotification:
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
}
}
func (c *Client) reconnect() bool {
if closeErr := c.client.GetError(); closeErr != nil {
c.logger.Warn("switching to the next RPC node",
zap.String("reason", closeErr.Error()),
)
} else {
// neo-go client was closed by calling `Close`
// method that happens only when the client has
// method, that happens only when a client has
// switched to the more prioritized RPC
continue
return true
}
if !c.switchRPC() {
@ -116,32 +163,14 @@ func (c *Client) notificationLoop() {
// switch client to inactive mode
c.inactiveMode()
return
return false
}
// TODO(@carpawell): call here some callback retrieved in constructor
// of the client to allow checking chain state since during switch
// process some notification could be lost
continue
}
select {
case c.notifications <- n:
continue
case <-c.cfg.ctx.Done():
_ = c.UnsubscribeAll()
c.close()
return
case <-c.closeChan:
_ = c.UnsubscribeAll()
c.close()
return
}
}
}
return true
}
func (c *Client) switchToMostPrioritized() {
@ -156,11 +185,12 @@ mainLoop:
return
case <-t.C:
c.switchLock.RLock()
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
copy(endpointsCopy, c.endpoints.list)
currPriority := c.endpoints.list[c.endpoints.curr].Priority
highestPriority := c.endpoints.list[0].Priority
c.switchLock.RUnlock()
if currPriority == highestPriority {
@ -186,7 +216,7 @@ mainLoop:
continue
}
if c.restoreSubscriptions(cli, tryE) {
if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
c.switchLock.Lock()
// higher priority node could have been
@ -201,6 +231,7 @@ mainLoop:
c.cache.invalidate()
c.client = cli
c.setActor(act)
c.updateSubs(subs)
c.endpoints.curr = i
c.switchLock.Unlock()

View file

@ -1,6 +1,10 @@
package client
import (
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
return nil
}
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
if err != nil {
return err
}
@ -64,7 +68,7 @@ func (c *Client) SubscribeForNewBlocks() error {
return nil
}
_, err := c.client.SubscribeForNewBlocks(nil)
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
if err != nil {
return err
}
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
return nil
}
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
if err != nil {
return err
}
@ -203,9 +207,25 @@ func (c *Client) UnsubscribeAll() error {
return nil
}
// restoreSubscriptions restores subscriptions according to
// cached information about them.
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
type subsInfo struct {
blockRcv chan *block.Block
notificationRcv chan *state.ContainedNotificationEvent
notaryReqRcv chan *result.NotaryRequestEvent
subscribedToBlocks bool
subscribedEvents map[util.Uint160]string
subscribedNotaryEvents map[util.Uint160]string
}
// restoreSubscriptions restores subscriptions according to cached
// information about them.
//
// If it is NOT a background operation switchLock MUST be held.
// Returns a pair: the second is a restoration status and the first
// one contains subscription information applied to the passed cli
// and receivers for the updated subscriptions.
// Does not change Client instance.
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
var (
err error
id string
@ -214,72 +234,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
stopCh := make(chan struct{})
defer close(stopCh)
blockRcv := make(chan *block.Block)
notificationRcv := make(chan *state.ContainedNotificationEvent)
notaryReqRcv := make(chan *result.NotaryRequestEvent)
// neo-go WS client says to _always_ read notifications
// from its channel. Subscribing to any notification
// while not reading them in another goroutine may
// lead to a dead-lock, thus that async side notification
// listening while restoring subscriptions
go func() {
var e any
var ok bool
for {
select {
case <-stopCh:
return
case n, ok := <-cli.Notifications:
case e, ok = <-blockRcv:
case e, ok = <-notificationRcv:
case e, ok = <-notaryReqRcv:
}
if !ok {
return
}
c.notifications <- n
if background {
// background client (test) switch, no need to send
// any notification, just preventing dead-lock
continue
}
c.routeEvent(e)
}
}()
if background {
c.switchLock.RLock()
defer c.switchLock.RUnlock()
}
si.subscribedToBlocks = c.subscribedToNewBlocks
si.subscribedEvents = copySubsMap(c.subscribedEvents)
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
si.blockRcv = blockRcv
si.notificationRcv = notificationRcv
si.notaryReqRcv = notaryReqRcv
// new block events restoration
if c.subscribedToNewBlocks {
_, err = cli.SubscribeForNewBlocks(nil)
if si.subscribedToBlocks {
_, err = cli.ReceiveBlocks(nil, blockRcv)
if err != nil {
c.logger.Error("could not restore block subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
return
}
}
// notification events restoration
for contract := range c.subscribedEvents {
for contract := range si.subscribedEvents {
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
if err != nil {
c.logger.Error("could not restore notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
return
}
c.subscribedEvents[contract] = id
si.subscribedEvents[contract] = id
}
// notary notification events restoration
if c.notary != nil {
for signer := range c.subscribedNotaryEvents {
for signer := range si.subscribedNotaryEvents {
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
if err != nil {
c.logger.Error("could not restore notary notification subscription after RPC switch",
zap.String("endpoint", endpoint),
zap.Error(err),
)
return false
return
}
c.subscribedNotaryEvents[signer] = id
si.subscribedNotaryEvents[signer] = id
}
}
return true
return si, true
}
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
newM := make(map[util.Uint160]string, len(m))
for k, v := range m {
newM[k] = v
}
return newM
}

View file

@ -93,7 +93,7 @@ func (mb *managerBuilder) BuildManagers(epoch uint64, p apireputation.PeerID) ([
copy(nodes, nmNodes)
hrw.SortHasherSliceByValue(nodes, epoch)
hrw.SortSliceByValue(nodes, epoch)
for i := range nodes {
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {

View file

@ -19,29 +19,29 @@ func ReadNodeAttributes(dst *netmap.NodeInfo, attrs []string) error {
for i := range attrs {
line := replaceEscaping(attrs[i], false) // replaced escaped symbols with non-printable symbols
k, v, found := strings.Cut(line, keyValueSeparator)
if !found {
words := strings.Split(line, keyValueSeparator)
if len(words) != 2 {
return errors.New("missing attribute key and/or value")
}
_, ok := cache[k]
_, ok := cache[words[0]]
if ok {
return fmt.Errorf("duplicated keys %s", k)
return fmt.Errorf("duplicated keys %s", words[0])
}
cache[k] = struct{}{}
cache[words[0]] = struct{}{}
// replace non-printable symbols with escaped symbols without escape character
k = replaceEscaping(k, true)
v = replaceEscaping(v, true)
words[0] = replaceEscaping(words[0], true)
words[1] = replaceEscaping(words[1], true)
if k == "" {
if words[0] == "" {
return errors.New("empty key")
} else if v == "" {
} else if words[1] == "" {
return errors.New("empty value")
}
dst.SetAttribute(k, v)
dst.SetAttribute(words[0], words[1])
}
return nil