forked from TrueCloudLab/frostfs-node
Compare commits
2 commits
master
...
carpawell/
Author | SHA1 | Date | |
---|---|---|---|
|
543d1dca85 | ||
|
8579ed4fff |
24 changed files with 302 additions and 212 deletions
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
|
@ -1 +1 @@
|
||||||
* @TrueCloudLab/storage-core @TrueCloudLab/committers
|
* @carpawell @fyrchik @acid-ant
|
||||||
|
|
|
@ -12,7 +12,6 @@ Changelog for FrostFS Node
|
||||||
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
|
- `object.delete.tombstone_lifetime` config parameter to set tombstone lifetime in the DELETE service (#2246)
|
||||||
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
|
||||||
- Multiple configs support (#44)
|
- Multiple configs support (#44)
|
||||||
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
|
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
|
||||||
|
@ -54,7 +53,6 @@ Changelog for FrostFS Node
|
||||||
- `golang.org/x/term` to `v0.3.0`
|
- `golang.org/x/term` to `v0.3.0`
|
||||||
- `google.golang.org/grpc` to `v1.51.0`
|
- `google.golang.org/grpc` to `v1.51.0`
|
||||||
- `github.com/nats-io/nats.go` to `v1.22.1`
|
- `github.com/nats-io/nats.go` to `v1.22.1`
|
||||||
- `github.com/TrueCloudLab/hrw` to `v.1.1.1`
|
|
||||||
- Minimum go version to v1.18
|
- Minimum go version to v1.18
|
||||||
|
|
||||||
### Updating from v0.35.0
|
### Updating from v0.35.0
|
||||||
|
|
|
@ -141,13 +141,13 @@ func setConfigCmd(cmd *cobra.Command, args []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseConfigPair(kvStr string, force bool) (key string, val any, err error) {
|
func parseConfigPair(kvStr string, force bool) (key string, val any, err error) {
|
||||||
k, v, found := strings.Cut(kvStr, "=")
|
kv := strings.SplitN(kvStr, "=", 2)
|
||||||
if !found {
|
if len(kv) != 2 {
|
||||||
return "", nil, fmt.Errorf("invalid parameter format: must be 'key=val', got: %s", kvStr)
|
return "", nil, fmt.Errorf("invalid parameter format: must be 'key=val', got: %s", kvStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
key = k
|
key = kv[0]
|
||||||
valRaw := v
|
valRaw := kv[1]
|
||||||
|
|
||||||
switch key {
|
switch key {
|
||||||
case netmapAuditFeeKey, netmapBasicIncomeRateKey,
|
case netmapAuditFeeKey, netmapBasicIncomeRateKey,
|
||||||
|
@ -162,7 +162,7 @@ func parseConfigPair(kvStr string, force bool) (key string, val any, err error)
|
||||||
case netmapEigenTrustAlphaKey:
|
case netmapEigenTrustAlphaKey:
|
||||||
// just check that it could
|
// just check that it could
|
||||||
// be parsed correctly
|
// be parsed correctly
|
||||||
_, err = strconv.ParseFloat(v, 64)
|
_, err = strconv.ParseFloat(kv[1], 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("could not parse %s's value '%s' as float: %w", key, valRaw, err)
|
err = fmt.Errorf("could not parse %s's value '%s' as float: %w", key, valRaw, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,23 +27,23 @@ func setPolicyCmd(cmd *cobra.Command, args []string) error {
|
||||||
|
|
||||||
bw := io.NewBufBinWriter()
|
bw := io.NewBufBinWriter()
|
||||||
for i := range args {
|
for i := range args {
|
||||||
k, v, found := strings.Cut(args[i], "=")
|
kv := strings.SplitN(args[i], "=", 2)
|
||||||
if !found {
|
if len(kv) != 2 {
|
||||||
return fmt.Errorf("invalid parameter format, must be Parameter=Value")
|
return fmt.Errorf("invalid parameter format, must be Parameter=Value")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch k {
|
switch kv[0] {
|
||||||
case execFeeParam, storagePriceParam, setFeeParam:
|
case execFeeParam, storagePriceParam, setFeeParam:
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("parameter must be one of %s, %s and %s", execFeeParam, storagePriceParam, setFeeParam)
|
return fmt.Errorf("parameter must be one of %s, %s and %s", execFeeParam, storagePriceParam, setFeeParam)
|
||||||
}
|
}
|
||||||
|
|
||||||
value, err := strconv.ParseUint(v, 10, 32)
|
value, err := strconv.ParseUint(kv[1], 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't parse parameter value '%s': %w", args[1], err)
|
return fmt.Errorf("can't parse parameter value '%s': %w", args[1], err)
|
||||||
}
|
}
|
||||||
|
|
||||||
emit.AppCall(bw.BinWriter, policy.Hash, "set"+k, callflag.All, int64(value))
|
emit.AppCall(bw.BinWriter, policy.Hash, "set"+kv[0], callflag.All, int64(value))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := wCtx.sendCommitteeTx(bw.Bytes(), false); err != nil {
|
if err := wCtx.sendCommitteeTx(bw.Bytes(), false); err != nil {
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
containerApi "github.com/TrueCloudLab/frostfs-api-go/v2/container"
|
|
||||||
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
|
@ -27,8 +26,6 @@ var (
|
||||||
containerAttributes []string
|
containerAttributes []string
|
||||||
containerAwait bool
|
containerAwait bool
|
||||||
containerName string
|
containerName string
|
||||||
containerNnsName string
|
|
||||||
containerNnsZone string
|
|
||||||
containerNoTimestamp bool
|
containerNoTimestamp bool
|
||||||
containerSubnet string
|
containerSubnet string
|
||||||
force bool
|
force bool
|
||||||
|
@ -163,8 +160,6 @@ func initContainerCreateCmd() {
|
||||||
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
||||||
flags.StringVar(&containerName, "name", "", "Container name attribute")
|
flags.StringVar(&containerName, "name", "", "Container name attribute")
|
||||||
flags.StringVar(&containerNnsName, "nns-name", "", "Container nns name attribute")
|
|
||||||
flags.StringVar(&containerNnsZone, "nns-zone", "", "Container nns zone attribute")
|
|
||||||
flags.BoolVar(&containerNoTimestamp, "disable-timestamp", false, "Disable timestamp container attribute")
|
flags.BoolVar(&containerNoTimestamp, "disable-timestamp", false, "Disable timestamp container attribute")
|
||||||
flags.StringVar(&containerSubnet, "subnet", "", "String representation of container subnetwork")
|
flags.StringVar(&containerSubnet, "subnet", "", "String representation of container subnetwork")
|
||||||
flags.BoolVarP(&force, commonflags.ForceFlag, commonflags.ForceFlagShorthand, false,
|
flags.BoolVarP(&force, commonflags.ForceFlag, commonflags.ForceFlagShorthand, false,
|
||||||
|
@ -202,12 +197,12 @@ func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.Plac
|
||||||
|
|
||||||
func parseAttributes(dst *container.Container, attributes []string) error {
|
func parseAttributes(dst *container.Container, attributes []string) error {
|
||||||
for i := range attributes {
|
for i := range attributes {
|
||||||
k, v, found := strings.Cut(attributes[i], attributeDelimiter)
|
kvPair := strings.Split(attributes[i], attributeDelimiter)
|
||||||
if !found {
|
if len(kvPair) != 2 {
|
||||||
return errors.New("invalid container attribute")
|
return errors.New("invalid container attribute")
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.SetAttribute(k, v)
|
dst.SetAttribute(kvPair[0], kvPair[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
if !containerNoTimestamp {
|
if !containerNoTimestamp {
|
||||||
|
@ -218,12 +213,5 @@ func parseAttributes(dst *container.Container, attributes []string) error {
|
||||||
container.SetName(dst, containerName)
|
container.SetName(dst, containerName)
|
||||||
}
|
}
|
||||||
|
|
||||||
if containerNnsName != "" {
|
|
||||||
dst.SetAttribute(containerApi.SysAttributeName, containerNnsName)
|
|
||||||
}
|
|
||||||
if containerNnsZone != "" {
|
|
||||||
dst.SetAttribute(containerApi.SysAttributeZone, containerNnsZone)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,12 +179,12 @@ func parseObjectAttrs(cmd *cobra.Command) ([]object.Attribute, error) {
|
||||||
|
|
||||||
attrs := make([]object.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
attrs := make([]object.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
for i := range rawAttrs {
|
for i := range rawAttrs {
|
||||||
k, v, found := strings.Cut(rawAttrs[i], "=")
|
kv := strings.SplitN(rawAttrs[i], "=", 2)
|
||||||
if !found {
|
if len(kv) != 2 {
|
||||||
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
|
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
|
||||||
}
|
}
|
||||||
attrs[i].SetKey(k)
|
attrs[i].SetKey(kv[0])
|
||||||
attrs[i].SetValue(v)
|
attrs[i].SetValue(kv[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
disableFilename, _ := cmd.Flags().GetBool("disable-filename")
|
disableFilename, _ := cmd.Flags().GetBool("disable-filename")
|
||||||
|
@ -218,26 +218,26 @@ func parseObjectNotifications(cmd *cobra.Command) (*object.NotificationInfo, err
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
before, after, found := strings.Cut(raw, separator)
|
rawSlice := strings.SplitN(raw, separator, 2)
|
||||||
if !found {
|
if len(rawSlice) != 2 {
|
||||||
return nil, fmt.Errorf("notification must be in the form of: *epoch*%s*topic*, got %s", separator, raw)
|
return nil, fmt.Errorf("notification must be in the form of: *epoch*%s*topic*, got %s", separator, raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
ni := new(object.NotificationInfo)
|
ni := new(object.NotificationInfo)
|
||||||
|
|
||||||
epoch, err := strconv.ParseUint(before, 10, 64)
|
epoch, err := strconv.ParseUint(rawSlice[0], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse notification epoch %s: %w", before, err)
|
return nil, fmt.Errorf("could not parse notification epoch %s: %w", rawSlice[0], err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ni.SetEpoch(epoch)
|
ni.SetEpoch(epoch)
|
||||||
|
|
||||||
if after == "" {
|
if rawSlice[1] == "" {
|
||||||
return nil, fmt.Errorf("incorrect empty topic: use %s to force using default topic", useDefaultTopic)
|
return nil, fmt.Errorf("incorrect empty topic: use %s to force using default topic", useDefaultTopic)
|
||||||
}
|
}
|
||||||
|
|
||||||
if after != useDefaultTopic {
|
if rawSlice[1] != useDefaultTopic {
|
||||||
ni.SetTopic(after)
|
ni.SetTopic(rawSlice[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
return ni, nil
|
return ni, nil
|
||||||
|
|
|
@ -154,16 +154,16 @@ func getRangeList(cmd *cobra.Command) ([]*object.Range, error) {
|
||||||
vs := strings.Split(v, ",")
|
vs := strings.Split(v, ",")
|
||||||
rs := make([]*object.Range, len(vs))
|
rs := make([]*object.Range, len(vs))
|
||||||
for i := range vs {
|
for i := range vs {
|
||||||
before, after, found := strings.Cut(vs[i], rangeSep)
|
r := strings.Split(vs[i], rangeSep)
|
||||||
if !found {
|
if len(r) != 2 {
|
||||||
return nil, fmt.Errorf("invalid range specifier: %s", vs[i])
|
return nil, fmt.Errorf("invalid range specifier: %s", vs[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
offset, err := strconv.ParseUint(before, 10, 64)
|
offset, err := strconv.ParseUint(r[0], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", vs[i], err)
|
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", vs[i], err)
|
||||||
}
|
}
|
||||||
length, err := strconv.ParseUint(after, 10, 64)
|
length, err := strconv.ParseUint(r[1], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", vs[i], err)
|
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", vs[i], err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,12 +63,12 @@ func parseXHeaders(cmd *cobra.Command) []string {
|
||||||
xs := make([]string, 0, 2*len(xHeaders))
|
xs := make([]string, 0, 2*len(xHeaders))
|
||||||
|
|
||||||
for i := range xHeaders {
|
for i := range xHeaders {
|
||||||
k, v, found := strings.Cut(xHeaders[i], "=")
|
kv := strings.SplitN(xHeaders[i], "=", 2)
|
||||||
if !found {
|
if len(kv) != 2 {
|
||||||
panic(fmt.Errorf("invalid X-Header format: %s", xHeaders[i]))
|
panic(fmt.Errorf("invalid X-Header format: %s", xHeaders[i]))
|
||||||
}
|
}
|
||||||
|
|
||||||
xs = append(xs, k, v)
|
xs = append(xs, kv[0], kv[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
return xs
|
return xs
|
||||||
|
|
|
@ -79,14 +79,14 @@ func parseMeta(cmd *cobra.Command) ([]*tree.KeyValue, error) {
|
||||||
|
|
||||||
pairs := make([]*tree.KeyValue, 0, len(raws))
|
pairs := make([]*tree.KeyValue, 0, len(raws))
|
||||||
for i := range raws {
|
for i := range raws {
|
||||||
k, v, found := strings.Cut(raws[i], "=")
|
kv := strings.SplitN(raws[i], "=", 2)
|
||||||
if !found {
|
if len(kv) != 2 {
|
||||||
return nil, fmt.Errorf("invalid meta pair format: %s", raws[i])
|
return nil, fmt.Errorf("invalid meta pair format: %s", raws[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
var pair tree.KeyValue
|
var pair tree.KeyValue
|
||||||
pair.Key = k
|
pair.Key = kv[0]
|
||||||
pair.Value = []byte(v)
|
pair.Value = []byte(kv[1])
|
||||||
|
|
||||||
pairs = append(pairs, &pair)
|
pairs = append(pairs, &pair)
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,32 +226,35 @@ func parseEACLTable(tb *eacl.Table, args []string) error {
|
||||||
|
|
||||||
func parseEACLRecord(args []string) (*eacl.Record, error) {
|
func parseEACLRecord(args []string) (*eacl.Record, error) {
|
||||||
r := new(eacl.Record)
|
r := new(eacl.Record)
|
||||||
for _, arg := range args {
|
for i := range args {
|
||||||
before, after, found := strings.Cut(arg, ":")
|
ss := strings.SplitN(args[i], ":", 2)
|
||||||
|
|
||||||
switch prefix := strings.ToLower(before); prefix {
|
switch prefix := strings.ToLower(ss[0]); prefix {
|
||||||
case "req", "obj": // filters
|
case "req", "obj": // filters
|
||||||
if !found {
|
if len(ss) != 2 {
|
||||||
return nil, fmt.Errorf("invalid filter or target: %s", arg)
|
return nil, fmt.Errorf("invalid filter or target: %s", args[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
i := strings.Index(ss[1], "=")
|
||||||
|
if i < 0 {
|
||||||
|
return nil, fmt.Errorf("invalid filter key-value pair: %s", ss[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
var key, value string
|
var key, value string
|
||||||
var op eacl.Match
|
var op eacl.Match
|
||||||
var f bool
|
|
||||||
|
|
||||||
key, value, f = strings.Cut(after, "!=")
|
if 0 < i && ss[1][i-1] == '!' {
|
||||||
if f {
|
key = ss[1][:i-1]
|
||||||
op = eacl.MatchStringNotEqual
|
op = eacl.MatchStringNotEqual
|
||||||
} else {
|
} else {
|
||||||
key, value, f = strings.Cut(after, "=")
|
key = ss[1][:i]
|
||||||
if !f {
|
|
||||||
return nil, fmt.Errorf("invalid filter key-value pair: %s", after)
|
|
||||||
}
|
|
||||||
op = eacl.MatchStringEqual
|
op = eacl.MatchStringEqual
|
||||||
}
|
}
|
||||||
|
|
||||||
|
value = ss[1][i+1:]
|
||||||
|
|
||||||
typ := eacl.HeaderFromRequest
|
typ := eacl.HeaderFromRequest
|
||||||
if before == "obj" {
|
if ss[0] == "obj" {
|
||||||
typ = eacl.HeaderFromObject
|
typ = eacl.HeaderFromObject
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,8 +263,8 @@ func parseEACLRecord(args []string) (*eacl.Record, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
var pubs []ecdsa.PublicKey
|
var pubs []ecdsa.PublicKey
|
||||||
if found {
|
if len(ss) == 2 {
|
||||||
pubs, err = parseKeyList(after)
|
pubs, err = parseKeyList(ss[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -278,7 +281,7 @@ func parseEACLRecord(args []string) (*eacl.Record, error) {
|
||||||
eacl.AddFormedTarget(r, role, pubs...)
|
eacl.AddFormedTarget(r, role, pubs...)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("invalid prefix: %s", before)
|
return nil, fmt.Errorf("invalid prefix: %s", ss[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,14 +65,14 @@ func loadEnv(path string) {
|
||||||
|
|
||||||
scanner := bufio.NewScanner(f)
|
scanner := bufio.NewScanner(f)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
k, v, found := strings.Cut(scanner.Text(), "=")
|
pair := strings.SplitN(scanner.Text(), "=", 2)
|
||||||
if !found {
|
if len(pair) != 2 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
v = strings.Trim(v, `"`)
|
pair[1] = strings.Trim(pair[1], `"`)
|
||||||
|
|
||||||
err = os.Setenv(k, v)
|
err = os.Setenv(pair[0], pair[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("can't set environment variable")
|
panic("can't set environment variable")
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/TrueCloudLab/frostfs-api-go/v2 v2.0.0-20221212144048-1351b6656d68
|
github.com/TrueCloudLab/frostfs-api-go/v2 v2.0.0-20221212144048-1351b6656d68
|
||||||
github.com/TrueCloudLab/frostfs-contract v0.0.0-20221213081248-6c805c1b4e42
|
github.com/TrueCloudLab/frostfs-contract v0.0.0-20221213081248-6c805c1b4e42
|
||||||
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556
|
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556
|
||||||
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52
|
github.com/TrueCloudLab/hrw v1.1.0
|
||||||
github.com/TrueCloudLab/tzhash v1.7.0
|
github.com/TrueCloudLab/tzhash v1.7.0
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
github.com/chzyer/readline v1.5.1
|
github.com/chzyer/readline v1.5.1
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -52,8 +52,6 @@ github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556 h1:Cc1
|
||||||
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556/go.mod h1:4ZiG4jNLzrqeJbmZUrPI7wDZhQVPaf0zEIWa/eBsqBg=
|
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556/go.mod h1:4ZiG4jNLzrqeJbmZUrPI7wDZhQVPaf0zEIWa/eBsqBg=
|
||||||
github.com/TrueCloudLab/hrw v1.1.0 h1:2U69PpUX1UtMWgh/RAg6D8mQW+/WsxbLNE+19EUhLhY=
|
github.com/TrueCloudLab/hrw v1.1.0 h1:2U69PpUX1UtMWgh/RAg6D8mQW+/WsxbLNE+19EUhLhY=
|
||||||
github.com/TrueCloudLab/hrw v1.1.0/go.mod h1:Pzi8Hy3qx12cew+ajVxgbtDVM4sRG9/gJnJLcL/yRyY=
|
github.com/TrueCloudLab/hrw v1.1.0/go.mod h1:Pzi8Hy3qx12cew+ajVxgbtDVM4sRG9/gJnJLcL/yRyY=
|
||||||
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52 h1:fBeG0EkL7Pa2D0SIiZt3yQYGpP/IvrXg4xEPAZ4Jjys=
|
|
||||||
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52/go.mod h1:BG6NztCuNc0UFr6MWJ4MM1sUl9lxx6PBRwLmTxdre20=
|
|
||||||
github.com/TrueCloudLab/rfc6979 v0.3.0 h1:0SYMAfQWh/TjnofqYQHy+s3rmQ5gi0fvOaDbqd60/Ic=
|
github.com/TrueCloudLab/rfc6979 v0.3.0 h1:0SYMAfQWh/TjnofqYQHy+s3rmQ5gi0fvOaDbqd60/Ic=
|
||||||
github.com/TrueCloudLab/rfc6979 v0.3.0/go.mod h1:qylxFXFQ/sMvpZC/8JyWp+mfzk5Zj/KDT5FAbekhobc=
|
github.com/TrueCloudLab/rfc6979 v0.3.0/go.mod h1:qylxFXFQ/sMvpZC/8JyWp+mfzk5Zj/KDT5FAbekhobc=
|
||||||
github.com/TrueCloudLab/tzhash v1.7.0 h1:btGORepc7Dg+n4MxgJxv73c9eYhwSBI5HqsqUBRmJiw=
|
github.com/TrueCloudLab/tzhash v1.7.0 h1:btGORepc7Dg+n4MxgJxv73c9eYhwSBI5HqsqUBRmJiw=
|
||||||
|
|
|
@ -73,18 +73,18 @@ func stringifyAddress(addr oid.Address) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func addressFromString(s string) (oid.Address, error) {
|
func addressFromString(s string) (oid.Address, error) {
|
||||||
before, after, found := strings.Cut(s, ".")
|
i := strings.IndexByte(s, '.')
|
||||||
if !found {
|
if i == -1 {
|
||||||
return oid.Address{}, errors.New("invalid address")
|
return oid.Address{}, errors.New("invalid address")
|
||||||
}
|
}
|
||||||
|
|
||||||
var obj oid.ID
|
var obj oid.ID
|
||||||
if err := obj.DecodeString(before); err != nil {
|
if err := obj.DecodeString(s[:i]); err != nil {
|
||||||
return oid.Address{}, err
|
return oid.Address{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cnr cid.ID
|
var cnr cid.ID
|
||||||
if err := cnr.DecodeString(after); err != nil {
|
if err := cnr.DecodeString(s[i+1:]); err != nil {
|
||||||
return oid.Address{}, err
|
return oid.Address{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ type StorageEngine struct {
|
||||||
|
|
||||||
mtx *sync.RWMutex
|
mtx *sync.RWMutex
|
||||||
|
|
||||||
shards map[string]hashedShard
|
shards map[string]shardWrapper
|
||||||
|
|
||||||
shardPools map[string]util.WorkerPool
|
shardPools map[string]util.WorkerPool
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ func New(opts ...Option) *StorageEngine {
|
||||||
return &StorageEngine{
|
return &StorageEngine{
|
||||||
cfg: c,
|
cfg: c,
|
||||||
mtx: new(sync.RWMutex),
|
mtx: new(sync.RWMutex),
|
||||||
shards: make(map[string]hashedShard),
|
shards: make(map[string]shardWrapper),
|
||||||
shardPools: make(map[string]util.WorkerPool),
|
shardPools: make(map[string]util.WorkerPool),
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
setModeCh: make(chan setModeRequest),
|
setModeCh: make(chan setModeRequest),
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
oidtest "github.com/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "github.com/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
usertest "github.com/TrueCloudLab/frostfs-sdk-go/user/test"
|
usertest "github.com/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/version"
|
"github.com/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
"github.com/TrueCloudLab/hrw"
|
|
||||||
"github.com/TrueCloudLab/tzhash/tz"
|
"github.com/TrueCloudLab/tzhash/tz"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -87,12 +86,9 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
engine.shards[s.ID().String()] = hashedShard{
|
engine.shards[s.ID().String()] = shardWrapper{
|
||||||
shardWrapper: shardWrapper{
|
errorCount: atomic.NewUint32(0),
|
||||||
errorCount: atomic.NewUint32(0),
|
Shard: s,
|
||||||
Shard: s,
|
|
||||||
},
|
|
||||||
hash: hrw.Hash([]byte(s.ID().String())),
|
|
||||||
}
|
}
|
||||||
engine.shardPools[s.ID().String()] = pool
|
engine.shardPools[s.ID().String()] = pool
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,7 @@ mainLoop:
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
||||||
for j := range shards {
|
for j := range shards {
|
||||||
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -16,10 +16,7 @@ import (
|
||||||
|
|
||||||
var errShardNotFound = logicerr.New("shard not found")
|
var errShardNotFound = logicerr.New("shard not found")
|
||||||
|
|
||||||
type hashedShard struct {
|
type hashedShard shardWrapper
|
||||||
shardWrapper
|
|
||||||
hash uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type metricsWithID struct {
|
type metricsWithID struct {
|
||||||
id string
|
id string
|
||||||
|
@ -130,12 +127,9 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||||
return fmt.Errorf("shard with id %s was already added", strID)
|
return fmt.Errorf("shard with id %s was already added", strID)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.shards[strID] = hashedShard{
|
e.shards[strID] = shardWrapper{
|
||||||
shardWrapper: shardWrapper{
|
errorCount: atomic.NewUint32(0),
|
||||||
errorCount: atomic.NewUint32(0),
|
Shard: sh,
|
||||||
Shard: sh,
|
|
||||||
},
|
|
||||||
hash: hrw.Hash([]byte(strID)),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
e.shardPools[strID] = pool
|
e.shardPools[strID] = pool
|
||||||
|
@ -150,7 +144,7 @@ func (e *StorageEngine) removeShards(ids ...string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ss := make([]hashedShard, 0, len(ids))
|
ss := make([]shardWrapper, 0, len(ids))
|
||||||
|
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
|
@ -216,7 +210,7 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
|
||||||
weights = append(weights, e.shardWeight(sh.Shard))
|
weights = append(weights, e.shardWeight(sh.Shard))
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
|
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
|
||||||
|
|
||||||
return shards
|
return shards
|
||||||
}
|
}
|
||||||
|
@ -282,5 +276,7 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hashedShard) Hash() uint64 {
|
func (s hashedShard) Hash() uint64 {
|
||||||
return s.hash
|
return hrw.Hash(
|
||||||
|
[]byte(s.Shard.ID().String()),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,13 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/gas"
|
||||||
|
@ -69,17 +72,20 @@ type Client struct {
|
||||||
// on every normal call.
|
// on every normal call.
|
||||||
switchLock *sync.RWMutex
|
switchLock *sync.RWMutex
|
||||||
|
|
||||||
// channel for ws notifications
|
// channels for ws notifications; protected with switchLock
|
||||||
notifications chan rpcclient.Notification
|
notifications chan rpcclient.Notification
|
||||||
|
blockRcv chan *block.Block
|
||||||
// channel for internal stop
|
notificationRcv chan *state.ContainedNotificationEvent
|
||||||
closeChan chan struct{}
|
notaryReqRcv chan *result.NotaryRequestEvent
|
||||||
|
|
||||||
// cached subscription information
|
// cached subscription information
|
||||||
subscribedEvents map[util.Uint160]string
|
subscribedEvents map[util.Uint160]string
|
||||||
subscribedNotaryEvents map[util.Uint160]string
|
subscribedNotaryEvents map[util.Uint160]string
|
||||||
subscribedToNewBlocks bool
|
subscribedToNewBlocks bool
|
||||||
|
|
||||||
|
// channel for internal stop
|
||||||
|
closeChan chan struct{}
|
||||||
|
|
||||||
// indicates that Client is not able to
|
// indicates that Client is not able to
|
||||||
// establish connection to any of the
|
// establish connection to any of the
|
||||||
// provided RPC endpoints
|
// provided RPC endpoints
|
||||||
|
@ -385,43 +391,43 @@ func (c *Client) roleList(r noderoles.Role) (keys.PublicKeys, error) {
|
||||||
//
|
//
|
||||||
// Wraps any error to frostfsError.
|
// Wraps any error to frostfsError.
|
||||||
func toStackParameter(value any) (sc.Parameter, error) {
|
func toStackParameter(value any) (sc.Parameter, error) {
|
||||||
var result = sc.Parameter{
|
var res = sc.Parameter{
|
||||||
Value: value,
|
Value: value,
|
||||||
}
|
}
|
||||||
|
|
||||||
switch v := value.(type) {
|
switch v := value.(type) {
|
||||||
case []byte:
|
case []byte:
|
||||||
result.Type = sc.ByteArrayType
|
res.Type = sc.ByteArrayType
|
||||||
case int:
|
case int:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(int64(v))
|
res.Value = big.NewInt(int64(v))
|
||||||
case int64:
|
case int64:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(v)
|
res.Value = big.NewInt(v)
|
||||||
case uint64:
|
case uint64:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = new(big.Int).SetUint64(v)
|
res.Value = new(big.Int).SetUint64(v)
|
||||||
case [][]byte:
|
case [][]byte:
|
||||||
arr := make([]sc.Parameter, 0, len(v))
|
arr := make([]sc.Parameter, 0, len(v))
|
||||||
for i := range v {
|
for i := range v {
|
||||||
elem, err := toStackParameter(v[i])
|
elem, err := toStackParameter(v[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
arr = append(arr, elem)
|
arr = append(arr, elem)
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Type = sc.ArrayType
|
res.Type = sc.ArrayType
|
||||||
result.Value = arr
|
res.Value = arr
|
||||||
case string:
|
case string:
|
||||||
result.Type = sc.StringType
|
res.Type = sc.StringType
|
||||||
case util.Uint160:
|
case util.Uint160:
|
||||||
result.Type = sc.ByteArrayType
|
res.Type = sc.ByteArrayType
|
||||||
result.Value = v.BytesBE()
|
res.Value = v.BytesBE()
|
||||||
case noderoles.Role:
|
case noderoles.Role:
|
||||||
result.Type = sc.IntegerType
|
res.Type = sc.IntegerType
|
||||||
result.Value = big.NewInt(int64(v))
|
res.Value = big.NewInt(int64(v))
|
||||||
case keys.PublicKeys:
|
case keys.PublicKeys:
|
||||||
arr := make([][]byte, 0, len(v))
|
arr := make([][]byte, 0, len(v))
|
||||||
for i := range v {
|
for i := range v {
|
||||||
|
@ -430,13 +436,13 @@ func toStackParameter(value any) (sc.Parameter, error) {
|
||||||
|
|
||||||
return toStackParameter(arr)
|
return toStackParameter(arr)
|
||||||
case bool:
|
case bool:
|
||||||
result.Type = sc.BoolType
|
res.Type = sc.BoolType
|
||||||
result.Value = v
|
res.Value = v
|
||||||
default:
|
default:
|
||||||
return result, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
|
return res, wrapFrostFSError(fmt.Errorf("chain/client: unsupported parameter %v", value))
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MagicNumber returns the magic number of the network
|
// MagicNumber returns the magic number of the network
|
||||||
|
@ -480,7 +486,7 @@ func (c *Client) MsPerBlock() (res int64, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsValidScript returns true if invocation script executes with HALT state.
|
// IsValidScript returns true if invocation script executes with HALT state.
|
||||||
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res bool, err error) {
|
func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error) {
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
defer c.switchLock.RUnlock()
|
defer c.switchLock.RUnlock()
|
||||||
|
|
||||||
|
@ -488,12 +494,12 @@ func (c *Client) IsValidScript(script []byte, signers []transaction.Signer) (res
|
||||||
return false, ErrConnectionLost
|
return false, ErrConnectionLost
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := c.client.InvokeScript(script, signers)
|
res, err := c.client.InvokeScript(script, signers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("invokeScript: %w", err)
|
return false, fmt.Errorf("invokeScript: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result.State == vmstate.Halt.String(), nil
|
return res.State == vmstate.Halt.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotificationChannel returns channel than receives subscribed
|
// NotificationChannel returns channel than receives subscribed
|
||||||
|
@ -525,3 +531,14 @@ func (c *Client) setActor(act *actor.Actor) {
|
||||||
c.gasToken = nep17.New(act, gas.Hash)
|
c.gasToken = nep17.New(act, gas.Hash)
|
||||||
c.rolemgmt = rolemgmt.New(act)
|
c.rolemgmt = rolemgmt.New(act)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateSubs updates subscription information, must be
|
||||||
|
// protected with switchLock.
|
||||||
|
func (c *Client) updateSubs(si subsInfo) {
|
||||||
|
c.blockRcv = si.blockRcv
|
||||||
|
c.notificationRcv = si.notificationRcv
|
||||||
|
c.notaryReqRcv = si.notaryReqRcv
|
||||||
|
|
||||||
|
c.subscribedEvents = si.subscribedEvents
|
||||||
|
c.subscribedNotaryEvents = si.subscribedNotaryEvents
|
||||||
|
}
|
||||||
|
|
|
@ -9,8 +9,11 @@ import (
|
||||||
|
|
||||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -109,6 +112,9 @@ func New(key *keys.PrivateKey, opts ...Option) (*Client, error) {
|
||||||
cfg: *cfg,
|
cfg: *cfg,
|
||||||
switchLock: &sync.RWMutex{},
|
switchLock: &sync.RWMutex{},
|
||||||
notifications: make(chan rpcclient.Notification),
|
notifications: make(chan rpcclient.Notification),
|
||||||
|
blockRcv: make(chan *block.Block),
|
||||||
|
notificationRcv: make(chan *state.ContainedNotificationEvent),
|
||||||
|
notaryReqRcv: make(chan *result.NotaryRequestEvent),
|
||||||
subscribedEvents: make(map[util.Uint160]string),
|
subscribedEvents: make(map[util.Uint160]string),
|
||||||
subscribedNotaryEvents: make(map[util.Uint160]string),
|
subscribedNotaryEvents: make(map[util.Uint160]string),
|
||||||
closeChan: make(chan struct{}),
|
closeChan: make(chan struct{}),
|
||||||
|
|
|
@ -4,6 +4,11 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,7 +56,8 @@ func (c *Client) switchRPC() bool {
|
||||||
c.logger.Info("connection to the new RPC node has been established",
|
c.logger.Info("connection to the new RPC node has been established",
|
||||||
zap.String("endpoint", newEndpoint))
|
zap.String("endpoint", newEndpoint))
|
||||||
|
|
||||||
if !c.restoreSubscriptions(cli, newEndpoint) {
|
subs, ok := c.restoreSubscriptions(cli, newEndpoint, false)
|
||||||
|
if !ok {
|
||||||
// new WS client does not allow
|
// new WS client does not allow
|
||||||
// restoring subscription, client
|
// restoring subscription, client
|
||||||
// could not work correctly =>
|
// could not work correctly =>
|
||||||
|
@ -63,6 +69,7 @@ func (c *Client) switchRPC() bool {
|
||||||
|
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.updateSubs(subs)
|
||||||
|
|
||||||
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
if c.cfg.switchInterval != 0 && !c.switchIsActive.Load() &&
|
||||||
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
c.endpoints.list[c.endpoints.curr].Priority != c.endpoints.list[0].Priority {
|
||||||
|
@ -77,9 +84,14 @@ func (c *Client) switchRPC() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notificationLoop() {
|
func (c *Client) notificationLoop() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
nChan := c.client.Notifications
|
bChan := c.blockRcv
|
||||||
|
nChan := c.notificationRcv
|
||||||
|
nrChan := c.notaryReqRcv
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -93,57 +105,74 @@ func (c *Client) notificationLoop() {
|
||||||
c.close()
|
c.close()
|
||||||
|
|
||||||
return
|
return
|
||||||
case n, ok := <-nChan:
|
case e, ok = <-bChan:
|
||||||
// notification channel is used as a connection
|
case e, ok = <-nChan:
|
||||||
// state: if it is closed, the connection is
|
case e, ok = <-nrChan:
|
||||||
// considered to be lost
|
}
|
||||||
if !ok {
|
|
||||||
if closeErr := c.client.GetError(); closeErr != nil {
|
|
||||||
c.logger.Warn("switching to the next RPC node",
|
|
||||||
zap.String("reason", closeErr.Error()),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// neo-go client was closed by calling `Close`
|
|
||||||
// method that happens only when the client has
|
|
||||||
// switched to the more prioritized RPC
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.switchRPC() {
|
if ok {
|
||||||
c.logger.Error("could not establish connection to any RPC node")
|
c.routeEvent(e)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// could not connect to all endpoints =>
|
if !c.reconnect() {
|
||||||
// switch client to inactive mode
|
return
|
||||||
c.inactiveMode()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(@carpawell): call here some callback retrieved in constructor
|
|
||||||
// of the client to allow checking chain state since during switch
|
|
||||||
// process some notification could be lost
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case c.notifications <- n:
|
|
||||||
continue
|
|
||||||
case <-c.cfg.ctx.Done():
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
case <-c.closeChan:
|
|
||||||
_ = c.UnsubscribeAll()
|
|
||||||
c.close()
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) routeEvent(e any) {
|
||||||
|
typedNotification := rpcclient.Notification{Value: e}
|
||||||
|
|
||||||
|
switch e.(type) {
|
||||||
|
case *block.Block:
|
||||||
|
typedNotification.Type = neorpc.BlockEventID
|
||||||
|
case *state.ContainedNotificationEvent:
|
||||||
|
typedNotification.Type = neorpc.NotificationEventID
|
||||||
|
case *result.NotaryRequestEvent:
|
||||||
|
typedNotification.Type = neorpc.NotaryRequestEventID
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.notifications <- typedNotification:
|
||||||
|
case <-c.cfg.ctx.Done():
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
case <-c.closeChan:
|
||||||
|
_ = c.UnsubscribeAll()
|
||||||
|
c.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) reconnect() bool {
|
||||||
|
if closeErr := c.client.GetError(); closeErr != nil {
|
||||||
|
c.logger.Warn("switching to the next RPC node",
|
||||||
|
zap.String("reason", closeErr.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// neo-go client was closed by calling `Close`
|
||||||
|
// method, that happens only when a client has
|
||||||
|
// switched to the more prioritized RPC
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !c.switchRPC() {
|
||||||
|
c.logger.Error("could not establish connection to any RPC node")
|
||||||
|
|
||||||
|
// could not connect to all endpoints =>
|
||||||
|
// switch client to inactive mode
|
||||||
|
c.inactiveMode()
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(@carpawell): call here some callback retrieved in constructor
|
||||||
|
// of the client to allow checking chain state since during switch
|
||||||
|
// process some notification could be lost
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) switchToMostPrioritized() {
|
func (c *Client) switchToMostPrioritized() {
|
||||||
t := time.NewTicker(c.cfg.switchInterval)
|
t := time.NewTicker(c.cfg.switchInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
@ -156,11 +185,12 @@ mainLoop:
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
c.switchLock.RLock()
|
c.switchLock.RLock()
|
||||||
|
|
||||||
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
endpointsCopy := make([]Endpoint, len(c.endpoints.list))
|
||||||
copy(endpointsCopy, c.endpoints.list)
|
copy(endpointsCopy, c.endpoints.list)
|
||||||
|
|
||||||
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
currPriority := c.endpoints.list[c.endpoints.curr].Priority
|
||||||
highestPriority := c.endpoints.list[0].Priority
|
highestPriority := c.endpoints.list[0].Priority
|
||||||
|
|
||||||
c.switchLock.RUnlock()
|
c.switchLock.RUnlock()
|
||||||
|
|
||||||
if currPriority == highestPriority {
|
if currPriority == highestPriority {
|
||||||
|
@ -186,7 +216,7 @@ mainLoop:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.restoreSubscriptions(cli, tryE) {
|
if subs, ok := c.restoreSubscriptions(cli, tryE, true); ok {
|
||||||
c.switchLock.Lock()
|
c.switchLock.Lock()
|
||||||
|
|
||||||
// higher priority node could have been
|
// higher priority node could have been
|
||||||
|
@ -201,6 +231,7 @@ mainLoop:
|
||||||
c.cache.invalidate()
|
c.cache.invalidate()
|
||||||
c.client = cli
|
c.client = cli
|
||||||
c.setActor(act)
|
c.setActor(act)
|
||||||
|
c.updateSubs(subs)
|
||||||
c.endpoints.curr = i
|
c.endpoints.curr = i
|
||||||
|
|
||||||
c.switchLock.Unlock()
|
c.switchLock.Unlock()
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -36,7 +40,7 @@ func (c *Client) SubscribeForExecutionNotifications(contract util.Uint160) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForExecutionNotifications(&contract, nil)
|
id, err := c.client.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, c.notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -64,7 +68,7 @@ func (c *Client) SubscribeForNewBlocks() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.client.SubscribeForNewBlocks(nil)
|
_, err := c.client.ReceiveBlocks(nil, c.blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -99,7 +103,7 @@ func (c *Client) SubscribeForNotaryRequests(txSigner util.Uint160) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.client.SubscribeForNotaryRequests(nil, &txSigner)
|
id, err := c.client.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &txSigner}, c.notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -203,9 +207,25 @@ func (c *Client) UnsubscribeAll() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreSubscriptions restores subscriptions according to
|
type subsInfo struct {
|
||||||
// cached information about them.
|
blockRcv chan *block.Block
|
||||||
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string) bool {
|
notificationRcv chan *state.ContainedNotificationEvent
|
||||||
|
notaryReqRcv chan *result.NotaryRequestEvent
|
||||||
|
|
||||||
|
subscribedToBlocks bool
|
||||||
|
subscribedEvents map[util.Uint160]string
|
||||||
|
subscribedNotaryEvents map[util.Uint160]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// restoreSubscriptions restores subscriptions according to cached
|
||||||
|
// information about them.
|
||||||
|
//
|
||||||
|
// If it is NOT a background operation switchLock MUST be held.
|
||||||
|
// Returns a pair: the second is a restoration status and the first
|
||||||
|
// one contains subscription information applied to the passed cli
|
||||||
|
// and receivers for the updated subscriptions.
|
||||||
|
// Does not change Client instance.
|
||||||
|
func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string, background bool) (si subsInfo, ok bool) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
id string
|
id string
|
||||||
|
@ -214,72 +234,109 @@ func (c *Client) restoreSubscriptions(cli *rpcclient.WSClient, endpoint string)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
|
|
||||||
|
blockRcv := make(chan *block.Block)
|
||||||
|
notificationRcv := make(chan *state.ContainedNotificationEvent)
|
||||||
|
notaryReqRcv := make(chan *result.NotaryRequestEvent)
|
||||||
|
|
||||||
// neo-go WS client says to _always_ read notifications
|
// neo-go WS client says to _always_ read notifications
|
||||||
// from its channel. Subscribing to any notification
|
// from its channel. Subscribing to any notification
|
||||||
// while not reading them in another goroutine may
|
// while not reading them in another goroutine may
|
||||||
// lead to a dead-lock, thus that async side notification
|
// lead to a dead-lock, thus that async side notification
|
||||||
// listening while restoring subscriptions
|
// listening while restoring subscriptions
|
||||||
go func() {
|
go func() {
|
||||||
|
var e any
|
||||||
|
var ok bool
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
return
|
return
|
||||||
case n, ok := <-cli.Notifications:
|
case e, ok = <-blockRcv:
|
||||||
if !ok {
|
case e, ok = <-notificationRcv:
|
||||||
return
|
case e, ok = <-notaryReqRcv:
|
||||||
}
|
|
||||||
|
|
||||||
c.notifications <- n
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if background {
|
||||||
|
// background client (test) switch, no need to send
|
||||||
|
// any notification, just preventing dead-lock
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.routeEvent(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if background {
|
||||||
|
c.switchLock.RLock()
|
||||||
|
defer c.switchLock.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
si.subscribedToBlocks = c.subscribedToNewBlocks
|
||||||
|
si.subscribedEvents = copySubsMap(c.subscribedEvents)
|
||||||
|
si.subscribedNotaryEvents = copySubsMap(c.subscribedNotaryEvents)
|
||||||
|
si.blockRcv = blockRcv
|
||||||
|
si.notificationRcv = notificationRcv
|
||||||
|
si.notaryReqRcv = notaryReqRcv
|
||||||
|
|
||||||
// new block events restoration
|
// new block events restoration
|
||||||
if c.subscribedToNewBlocks {
|
if si.subscribedToBlocks {
|
||||||
_, err = cli.SubscribeForNewBlocks(nil)
|
_, err = cli.ReceiveBlocks(nil, blockRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore block subscription after RPC switch",
|
c.logger.Error("could not restore block subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notification events restoration
|
// notification events restoration
|
||||||
for contract := range c.subscribedEvents {
|
for contract := range si.subscribedEvents {
|
||||||
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
contract := contract // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForExecutionNotifications(&contract, nil)
|
id, err = cli.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &contract}, notificationRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notification subscription after RPC switch",
|
c.logger.Error("could not restore notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedEvents[contract] = id
|
si.subscribedEvents[contract] = id
|
||||||
}
|
}
|
||||||
|
|
||||||
// notary notification events restoration
|
// notary notification events restoration
|
||||||
if c.notary != nil {
|
if c.notary != nil {
|
||||||
for signer := range c.subscribedNotaryEvents {
|
for signer := range si.subscribedNotaryEvents {
|
||||||
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
signer := signer // See https://github.com/nspcc-dev/neo-go/issues/2890
|
||||||
id, err = cli.SubscribeForNotaryRequests(nil, &signer)
|
id, err = cli.ReceiveNotaryRequests(&neorpc.TxFilter{Signer: &signer}, notaryReqRcv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
c.logger.Error("could not restore notary notification subscription after RPC switch",
|
||||||
zap.String("endpoint", endpoint),
|
zap.String("endpoint", endpoint),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return false
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.subscribedNotaryEvents[signer] = id
|
si.subscribedNotaryEvents[signer] = id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return si, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func copySubsMap(m map[util.Uint160]string) map[util.Uint160]string {
|
||||||
|
newM := make(map[util.Uint160]string, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
newM[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return newM
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (mb *managerBuilder) BuildManagers(epoch uint64, p apireputation.PeerID) ([
|
||||||
|
|
||||||
copy(nodes, nmNodes)
|
copy(nodes, nmNodes)
|
||||||
|
|
||||||
hrw.SortHasherSliceByValue(nodes, epoch)
|
hrw.SortSliceByValue(nodes, epoch)
|
||||||
|
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {
|
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {
|
||||||
|
|
|
@ -19,29 +19,29 @@ func ReadNodeAttributes(dst *netmap.NodeInfo, attrs []string) error {
|
||||||
for i := range attrs {
|
for i := range attrs {
|
||||||
line := replaceEscaping(attrs[i], false) // replaced escaped symbols with non-printable symbols
|
line := replaceEscaping(attrs[i], false) // replaced escaped symbols with non-printable symbols
|
||||||
|
|
||||||
k, v, found := strings.Cut(line, keyValueSeparator)
|
words := strings.Split(line, keyValueSeparator)
|
||||||
if !found {
|
if len(words) != 2 {
|
||||||
return errors.New("missing attribute key and/or value")
|
return errors.New("missing attribute key and/or value")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok := cache[k]
|
_, ok := cache[words[0]]
|
||||||
if ok {
|
if ok {
|
||||||
return fmt.Errorf("duplicated keys %s", k)
|
return fmt.Errorf("duplicated keys %s", words[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
cache[k] = struct{}{}
|
cache[words[0]] = struct{}{}
|
||||||
|
|
||||||
// replace non-printable symbols with escaped symbols without escape character
|
// replace non-printable symbols with escaped symbols without escape character
|
||||||
k = replaceEscaping(k, true)
|
words[0] = replaceEscaping(words[0], true)
|
||||||
v = replaceEscaping(v, true)
|
words[1] = replaceEscaping(words[1], true)
|
||||||
|
|
||||||
if k == "" {
|
if words[0] == "" {
|
||||||
return errors.New("empty key")
|
return errors.New("empty key")
|
||||||
} else if v == "" {
|
} else if words[1] == "" {
|
||||||
return errors.New("empty value")
|
return errors.New("empty value")
|
||||||
}
|
}
|
||||||
|
|
||||||
dst.SetAttribute(k, v)
|
dst.SetAttribute(words[0], words[1])
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue