forked from TrueCloudLab/frostfs-node
Compare commits
23 commits
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
|
1fc9351f4f | ||
|
1a2b8ab59d | ||
|
5180e467f8 | ||
|
a193db3a3d | ||
|
49ae91d720 | ||
|
b087d7ead3 | ||
|
e4f357561e | ||
|
437687f78d | ||
|
1cfa1763e9 | ||
|
3de3c102fc | ||
|
679df13924 | ||
|
73d367e287 | ||
|
5f54fd5dc8 | ||
|
fe3be92c89 | ||
|
2977552a19 | ||
|
48b5d2cb91 | ||
|
baad9d06a1 | ||
|
b5bcf90fa1 | ||
|
ce169491ed | ||
|
58f2354057 | ||
|
25b827e0fd | ||
|
00180a7ecf | ||
|
1f825a467a |
35 changed files with 1040 additions and 85 deletions
28
CHANGELOG.md
28
CHANGELOG.md
|
@ -3,6 +3,30 @@ Changelog for NeoFS Node
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.27.7] - 2022-03-30
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- Shard ID is now consistent between restarts (#1204)
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- More N3 RPC caches in object service (#1278)
|
||||||
|
|
||||||
|
## [0.27.6] - 2022-03-28
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- Allow empty passwords in neofs-cli config (#1136)
|
||||||
|
- Set correct audit range hash type in neofs-ir (#1180)
|
||||||
|
- Read objects directly from blobstor in case of shard inconsistency (#1186)
|
||||||
|
- Fix `-w` flag in subnet commands of neofs-adm (#1223)
|
||||||
|
- Do not use explicit mutex lock in chain caches (#1236)
|
||||||
|
- Force gRPC server stop if it can't shut down gracefully in storage node (#1270)
|
||||||
|
- Return non-zero exit code in `acl extended create` command failures and fix
|
||||||
|
help message (#1259)
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Interactive storage node configurator in neofs-adm (#1090)
|
||||||
|
- Logs in metabase operations (#1188)
|
||||||
|
|
||||||
## [0.27.5] - 2022-01-31
|
## [0.27.5] - 2022-01-31
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
@ -916,7 +940,9 @@ NeoFS-API v2.0 support and updated brand-new storage node application.
|
||||||
|
|
||||||
First public review release.
|
First public review release.
|
||||||
|
|
||||||
[Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...master
|
[Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.7...master
|
||||||
|
[0.27.7]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.6...v0.27.7
|
||||||
|
[0.27.6]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...v0.27.6
|
||||||
[0.27.5]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.4...v0.27.5
|
[0.27.5]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.4...v0.27.5
|
||||||
[0.27.4]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.3...v0.27.4
|
[0.27.4]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.3...v0.27.4
|
||||||
[0.27.3]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.2...v0.27.3
|
[0.27.3]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.2...v0.27.3
|
||||||
|
|
|
@ -770,6 +770,7 @@ var cmdSubnetNode = &cobra.Command{
|
||||||
Short: "Manage nodes of the NeoFS subnet.",
|
Short: "Manage nodes of the NeoFS subnet.",
|
||||||
PreRun: func(cmd *cobra.Command, _ []string) {
|
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||||
viperBindFlags(cmd,
|
viperBindFlags(cmd,
|
||||||
|
flagSubnetWallet,
|
||||||
flagSubnetNode,
|
flagSubnetNode,
|
||||||
flagSubnetNodeSubnet,
|
flagSubnetNodeSubnet,
|
||||||
)
|
)
|
||||||
|
@ -882,8 +883,10 @@ func init() {
|
||||||
|
|
||||||
// subnet node flags
|
// subnet node flags
|
||||||
nodeFlags := cmdSubnetNode.PersistentFlags()
|
nodeFlags := cmdSubnetNode.PersistentFlags()
|
||||||
|
nodeFlags.StringP(flagSubnetWallet, "w", "", "Path to file with wallet")
|
||||||
|
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetWallet)
|
||||||
nodeFlags.String(flagSubnetNode, "", "Hex-encoded public key of the node")
|
nodeFlags.String(flagSubnetNode, "", "Hex-encoded public key of the node")
|
||||||
_ = cmdSubnetAdmin.MarkFlagRequired(flagSubnetNode)
|
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNode)
|
||||||
nodeFlags.String(flagSubnetNodeSubnet, "", "ID of the subnet to manage nodes")
|
nodeFlags.String(flagSubnetNodeSubnet, "", "ID of the subnet to manage nodes")
|
||||||
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNodeSubnet)
|
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNodeSubnet)
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/config"
|
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/config"
|
||||||
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/morph"
|
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/morph"
|
||||||
|
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/storagecfg"
|
||||||
"github.com/nspcc-dev/neofs-node/misc"
|
"github.com/nspcc-dev/neofs-node/misc"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/autocomplete"
|
"github.com/nspcc-dev/neofs-node/pkg/util/autocomplete"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
@ -34,6 +35,7 @@ func init() {
|
||||||
|
|
||||||
rootCmd.AddCommand(config.RootCmd)
|
rootCmd.AddCommand(config.RootCmd)
|
||||||
rootCmd.AddCommand(morph.RootCmd)
|
rootCmd.AddCommand(morph.RootCmd)
|
||||||
|
rootCmd.AddCommand(storagecfg.RootCmd)
|
||||||
|
|
||||||
rootCmd.AddCommand(autocomplete.Command("neofs-adm"))
|
rootCmd.AddCommand(autocomplete.Command("neofs-adm"))
|
||||||
}
|
}
|
||||||
|
|
143
cmd/neofs-adm/internal/modules/storagecfg/config.go
Normal file
143
cmd/neofs-adm/internal/modules/storagecfg/config.go
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
package storagecfg
|
||||||
|
|
||||||
|
const configTemplate = `logger:
|
||||||
|
level: info # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
|
||||||
|
|
||||||
|
node:
|
||||||
|
wallet:
|
||||||
|
path: {{ .Wallet.Path }} # path to a NEO wallet; ignored if key is presented
|
||||||
|
address: {{ .Wallet.Account }} # address of a NEO account in the wallet; ignored if key is presented
|
||||||
|
password: {{ .Wallet.Password }} # password for a NEO account in the wallet; ignored if key is presented
|
||||||
|
addresses: # list of addresses announced by Storage node in the Network map
|
||||||
|
- {{ .AnnouncedAddress }}
|
||||||
|
attribute_0: UN-LOCODE:{{ .Attribute.Locode }}
|
||||||
|
relay: {{ .Relay }} # start Storage node in relay mode without bootstrapping into the Network map
|
||||||
|
subnet:
|
||||||
|
exit_zero: false # toggle entrance to zero subnet (overrides corresponding attribute and occurrence in entries)
|
||||||
|
entries: [] # list of IDs of subnets to enter in a text format of NeoFS API protocol (overrides corresponding attributes)
|
||||||
|
|
||||||
|
grpc:
|
||||||
|
num: 1 # total number of listener endpoints
|
||||||
|
0:
|
||||||
|
endpoint: {{ .Endpoint }} # endpoint for gRPC server
|
||||||
|
tls:{{if .TLSCert}}
|
||||||
|
enabled: true # enable TLS for a gRPC connection (min version is TLS 1.2)
|
||||||
|
certificate: {{ .TLSCert }} # path to TLS certificate
|
||||||
|
key: {{ .TLSKey }} # path to TLS key
|
||||||
|
{{- else }}
|
||||||
|
enabled: false # disable TLS for a gRPC connection
|
||||||
|
{{- end}}
|
||||||
|
|
||||||
|
control:
|
||||||
|
authorized_keys: # list of hex-encoded public keys that have rights to use the Control Service
|
||||||
|
{{- range .AuthorizedKeys }}
|
||||||
|
- {{.}}{{end}}
|
||||||
|
grpc:
|
||||||
|
endpoint: {{.ControlEndpoint}} # endpoint that is listened by the Control Service
|
||||||
|
|
||||||
|
morph:
|
||||||
|
dial_timeout: 20s # timeout for side chain NEO RPC client connection
|
||||||
|
disable_cache: false # use TTL cache for side chain GET operations
|
||||||
|
rpc_endpoint: # side chain N3 RPC endpoints
|
||||||
|
{{- range .MorphRPC }}
|
||||||
|
- https://{{.}}{{end}}
|
||||||
|
notification_endpoint: # side chain N3 RPC notification endpoints
|
||||||
|
{{- range .MorphRPC }}
|
||||||
|
- wss://{{.}}/ws{{end}}
|
||||||
|
{{if not .Relay }}
|
||||||
|
storage:
|
||||||
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
|
shard_num: 1 # total number of shards
|
||||||
|
|
||||||
|
default: # section with the default shard parameters
|
||||||
|
metabase:
|
||||||
|
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
|
||||||
|
|
||||||
|
blobstor:
|
||||||
|
perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
|
||||||
|
depth: 2 # max depth of object tree storage in FS
|
||||||
|
small_object_size: 102400 # 100KiB, size threshold for "small" objects which are stored in key-value DB, not in FS, bytes
|
||||||
|
compress: true # turn on/off Zstandard compression (level 3) of stored objects
|
||||||
|
compression_exclude_content_types:
|
||||||
|
- audio/*
|
||||||
|
- video/*
|
||||||
|
|
||||||
|
blobovnicza:
|
||||||
|
size: 1073741824 # approximate size limit of single blobovnicza instance, total size will be: size*width^(depth+1), bytes
|
||||||
|
depth: 1 # max depth of object tree storage in key-value DB
|
||||||
|
width: 4 # max width of object tree storage in key-value DB
|
||||||
|
opened_cache_capacity: 50 # maximum number of opened database files
|
||||||
|
|
||||||
|
gc:
|
||||||
|
remover_batch_size: 200 # number of objects to be removed by the garbage collector
|
||||||
|
remover_sleep_interval: 5m # frequency of the garbage collector invocation
|
||||||
|
|
||||||
|
shard:
|
||||||
|
0:
|
||||||
|
mode: "read-write" # mode of the shard, must be one of the: "read-write" (default), "read-only"
|
||||||
|
|
||||||
|
metabase:
|
||||||
|
path: {{ .MetabasePath }} # path to the metabase
|
||||||
|
|
||||||
|
blobstor:
|
||||||
|
path: {{ .BlobstorPath }} # path to the blobstor
|
||||||
|
{{end}}`
|
||||||
|
|
||||||
|
const (
|
||||||
|
neofsMainnetAddress = "2cafa46838e8b564468ebd868dcafdd99dce6221"
|
||||||
|
balanceMainnetAddress = "dc1ec98d9d0c5f9dfade16144defe08cffc5ca55"
|
||||||
|
neofsTestnetAddress = "b65d8243ac63983206d17e5221af0653a7266fa1"
|
||||||
|
balanceTestnetAddress = "e0420c216003747626670d1424569c17c79015bf"
|
||||||
|
)
|
||||||
|
|
||||||
|
var n3config = map[string]struct {
|
||||||
|
MorphRPC []string
|
||||||
|
RPC []string
|
||||||
|
NeoFSContract string
|
||||||
|
BalanceContract string
|
||||||
|
}{
|
||||||
|
"testnet": {
|
||||||
|
MorphRPC: []string{
|
||||||
|
"rpc01.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc02.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc03.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc04.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc05.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc06.morph.testnet.fs.neo.org:51331",
|
||||||
|
"rpc07.morph.testnet.fs.neo.org:51331",
|
||||||
|
},
|
||||||
|
RPC: []string{
|
||||||
|
"rpc01.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc02.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc03.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc04.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc05.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc06.testnet.n3.nspcc.ru:21331",
|
||||||
|
"rpc07.testnet.n3.nspcc.ru:21331",
|
||||||
|
},
|
||||||
|
NeoFSContract: neofsTestnetAddress,
|
||||||
|
BalanceContract: balanceTestnetAddress,
|
||||||
|
},
|
||||||
|
"mainnet": {
|
||||||
|
MorphRPC: []string{
|
||||||
|
"rpc1.morph.fs.neo.org:40341",
|
||||||
|
"rpc2.morph.fs.neo.org:40341",
|
||||||
|
"rpc3.morph.fs.neo.org:40341",
|
||||||
|
"rpc4.morph.fs.neo.org:40341",
|
||||||
|
"rpc5.morph.fs.neo.org:40341",
|
||||||
|
"rpc6.morph.fs.neo.org:40341",
|
||||||
|
"rpc7.morph.fs.neo.org:40341",
|
||||||
|
},
|
||||||
|
RPC: []string{
|
||||||
|
"rpc1.n3.nspcc.ru:10331",
|
||||||
|
"rpc2.n3.nspcc.ru:10331",
|
||||||
|
"rpc3.n3.nspcc.ru:10331",
|
||||||
|
"rpc4.n3.nspcc.ru:10331",
|
||||||
|
"rpc5.n3.nspcc.ru:10331",
|
||||||
|
"rpc6.n3.nspcc.ru:10331",
|
||||||
|
"rpc7.n3.nspcc.ru:10331",
|
||||||
|
},
|
||||||
|
NeoFSContract: neofsMainnetAddress,
|
||||||
|
BalanceContract: balanceMainnetAddress,
|
||||||
|
},
|
||||||
|
}
|
413
cmd/neofs-adm/internal/modules/storagecfg/root.go
Normal file
413
cmd/neofs-adm/internal/modules/storagecfg/root.go
Normal file
|
@ -0,0 +1,413 @@
|
||||||
|
package storagecfg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"math/big"
|
||||||
|
"math/rand"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"text/template"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/chzyer/readline"
|
||||||
|
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||||
|
"github.com/nspcc-dev/neo-go/cli/input"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
walletFlag = "wallet"
|
||||||
|
accountFlag = "account"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultControlEndpoint = "127.0.0.1:8090"
|
||||||
|
defaultDataEndpoint = "127.0.0.1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RootCmd is a root command of config section.
|
||||||
|
var RootCmd = &cobra.Command{
|
||||||
|
Use: "storage-config [-w wallet] [-a acccount] [<path-to-config>]",
|
||||||
|
Short: "Section for storage node configuration commands.",
|
||||||
|
Run: storageConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
fs := RootCmd.Flags()
|
||||||
|
|
||||||
|
fs.StringP(walletFlag, "w", "", "path to wallet")
|
||||||
|
fs.StringP(accountFlag, "a", "", "wallet account")
|
||||||
|
}
|
||||||
|
|
||||||
|
type config struct {
|
||||||
|
AnnouncedAddress string
|
||||||
|
AuthorizedKeys []string
|
||||||
|
ControlEndpoint string
|
||||||
|
Endpoint string
|
||||||
|
TLSCert string
|
||||||
|
TLSKey string
|
||||||
|
MorphRPC []string
|
||||||
|
Attribute struct {
|
||||||
|
Locode string
|
||||||
|
}
|
||||||
|
Wallet struct {
|
||||||
|
Path string
|
||||||
|
Account string
|
||||||
|
Password string
|
||||||
|
}
|
||||||
|
Relay bool
|
||||||
|
BlobstorPath string
|
||||||
|
MetabasePath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func storageConfig(cmd *cobra.Command, args []string) {
|
||||||
|
var outPath string
|
||||||
|
if len(args) != 0 {
|
||||||
|
outPath = args[0]
|
||||||
|
} else {
|
||||||
|
outPath = getPath("File to write config at [./config.yml]: ")
|
||||||
|
if outPath == "" {
|
||||||
|
outPath = "./config.yml"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
historyPath := filepath.Join(os.TempDir(), "neofs-adm.history")
|
||||||
|
readline.SetHistoryPath(historyPath)
|
||||||
|
|
||||||
|
var c config
|
||||||
|
|
||||||
|
c.Wallet.Path, _ = cmd.Flags().GetString(walletFlag)
|
||||||
|
if c.Wallet.Path == "" {
|
||||||
|
c.Wallet.Path = getPath("Path to the storage node wallet: ")
|
||||||
|
}
|
||||||
|
|
||||||
|
w, err := wallet.NewWalletFromFile(c.Wallet.Path)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
c.Wallet.Account, _ = cmd.Flags().GetString(accountFlag)
|
||||||
|
if c.Wallet.Account == "" {
|
||||||
|
addr := address.Uint160ToString(w.GetChangeAddress())
|
||||||
|
c.Wallet.Account = getWalletAccount(w, fmt.Sprintf("Wallet account [%s]: ", addr))
|
||||||
|
if c.Wallet.Account == "" {
|
||||||
|
c.Wallet.Account = addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
accH, err := flags.ParseAddress(c.Wallet.Account)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
acc := w.GetAccount(accH)
|
||||||
|
if acc == nil {
|
||||||
|
fatalOnErr(errors.New("can't find account in wallet"))
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Account password for %s: ", c.Wallet.Account))
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
c.AuthorizedKeys = append(c.AuthorizedKeys, hex.EncodeToString(acc.PrivateKey().PublicKey().Bytes()))
|
||||||
|
|
||||||
|
var network string
|
||||||
|
for {
|
||||||
|
network = getString("Choose network [mainnet]/testnet: ")
|
||||||
|
switch network {
|
||||||
|
case "":
|
||||||
|
network = "mainnet"
|
||||||
|
case "testnet", "mainnet":
|
||||||
|
default:
|
||||||
|
cmd.Println(`Network must be either "mainnet" or "testnet"`)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
c.MorphRPC = n3config[network].MorphRPC
|
||||||
|
|
||||||
|
depositGas(cmd, acc, network)
|
||||||
|
|
||||||
|
c.Attribute.Locode = getString("UN-LOCODE attribute in [XX YYY] format: ")
|
||||||
|
var addr, port string
|
||||||
|
for {
|
||||||
|
c.AnnouncedAddress = getString("Publicly announced address: ")
|
||||||
|
addr, port, err = net.SplitHostPort(c.AnnouncedAddress)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Println("Address must have form A.B.C.D:PORT")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ip, err := net.ResolveIPAddr("ip", addr)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Printf("Can't resolve IP address %s: %v\n", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ip.IP.IsGlobalUnicast() {
|
||||||
|
cmd.Println("IP must be global unicast.")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cmd.Printf("Resolved IP address: %s\n", ip.String())
|
||||||
|
|
||||||
|
_, err = strconv.ParseUint(port, 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
cmd.Println("Port must be an integer.")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultAddr := net.JoinHostPort(defaultDataEndpoint, port)
|
||||||
|
c.Endpoint = getString(fmt.Sprintf("Listening address [%s]: ", defaultAddr))
|
||||||
|
if c.Endpoint == "" {
|
||||||
|
c.Endpoint = defaultAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
c.ControlEndpoint = getString(fmt.Sprintf("Listening address (control endpoint) [%s]: ", defaultControlEndpoint))
|
||||||
|
if c.ControlEndpoint == "" {
|
||||||
|
c.ControlEndpoint = defaultControlEndpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
c.TLSCert = getPath("TLS Certificate (optional): ")
|
||||||
|
if c.TLSCert != "" {
|
||||||
|
c.TLSKey = getPath("TLS Key: ")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Relay = getConfirmation(false, "Use node as a relay? yes/[no]: ")
|
||||||
|
if !c.Relay {
|
||||||
|
p := getPath("Path to the storage directory (all available storage will be used): ")
|
||||||
|
c.BlobstorPath = filepath.Join(p, "blob")
|
||||||
|
c.MetabasePath = filepath.Join(p, "meta")
|
||||||
|
}
|
||||||
|
|
||||||
|
out := applyTemplate(c)
|
||||||
|
fatalOnErr(ioutil.WriteFile(outPath, out, 0644))
|
||||||
|
|
||||||
|
cmd.Println("Node is ready for work! Run `neofs-node -config " + outPath + "`")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getWalletAccount(w *wallet.Wallet, prompt string) string {
|
||||||
|
addrs := make([]readline.PrefixCompleterInterface, len(w.Accounts))
|
||||||
|
for i := range w.Accounts {
|
||||||
|
addrs[i] = readline.PcItem(w.Accounts[i].Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
readline.SetAutoComplete(readline.NewPrefixCompleter(addrs...))
|
||||||
|
defer readline.SetAutoComplete(nil)
|
||||||
|
|
||||||
|
s, err := readline.Line(prompt)
|
||||||
|
fatalOnErr(err)
|
||||||
|
return strings.TrimSpace(s) // autocompleter can return a string with a trailing space
|
||||||
|
}
|
||||||
|
|
||||||
|
func getString(prompt string) string {
|
||||||
|
s, err := readline.Line(prompt)
|
||||||
|
fatalOnErr(err)
|
||||||
|
if s != "" {
|
||||||
|
_ = readline.AddHistory(s)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
type filenameCompleter struct{}
|
||||||
|
|
||||||
|
func (filenameCompleter) Do(line []rune, pos int) (newLine [][]rune, length int) {
|
||||||
|
prefix := string(line[:pos])
|
||||||
|
dir := filepath.Dir(prefix)
|
||||||
|
de, err := os.ReadDir(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range de {
|
||||||
|
name := filepath.Join(dir, de[i].Name())
|
||||||
|
if strings.HasPrefix(name, prefix) {
|
||||||
|
tail := []rune(strings.TrimPrefix(name, prefix))
|
||||||
|
if de[i].IsDir() {
|
||||||
|
tail = append(tail, filepath.Separator)
|
||||||
|
}
|
||||||
|
newLine = append(newLine, tail)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if pos != 0 {
|
||||||
|
return newLine, pos - len([]rune(dir))
|
||||||
|
}
|
||||||
|
return newLine, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPath(prompt string) string {
|
||||||
|
readline.SetAutoComplete(filenameCompleter{})
|
||||||
|
defer readline.SetAutoComplete(nil)
|
||||||
|
|
||||||
|
p, err := readline.Line(prompt)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
if p == "" {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = readline.AddHistory(p)
|
||||||
|
|
||||||
|
abs, err := filepath.Abs(p)
|
||||||
|
if err != nil {
|
||||||
|
fatalOnErr(fmt.Errorf("can't create an absolute path: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return abs
|
||||||
|
}
|
||||||
|
|
||||||
|
func getConfirmation(def bool, prompt string) bool {
|
||||||
|
for {
|
||||||
|
s, err := readline.Line(prompt)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
switch strings.ToLower(s) {
|
||||||
|
case "y", "yes":
|
||||||
|
return true
|
||||||
|
case "n", "no":
|
||||||
|
return false
|
||||||
|
default:
|
||||||
|
if len(s) == 0 {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyTemplate(c config) []byte {
|
||||||
|
tmpl, err := template.New("config").Parse(configTemplate)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
b := bytes.NewBuffer(nil)
|
||||||
|
fatalOnErr(tmpl.Execute(b, c))
|
||||||
|
|
||||||
|
return b.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func fatalOnErr(err error) {
|
||||||
|
if err != nil {
|
||||||
|
_, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func depositGas(cmd *cobra.Command, acc *wallet.Account, network string) {
|
||||||
|
sideClient := initClient(n3config[network].MorphRPC)
|
||||||
|
balanceHash, _ := util.Uint160DecodeStringLE(n3config[network].BalanceContract)
|
||||||
|
|
||||||
|
res, err := sideClient.InvokeFunction(balanceHash, "balanceOf", []smartcontract.Parameter{{
|
||||||
|
Type: smartcontract.Hash160Type,
|
||||||
|
Value: acc.Contract.ScriptHash(),
|
||||||
|
}}, nil)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
if res.State != vm.HaltState.String() {
|
||||||
|
fatalOnErr(fmt.Errorf("invalid response from balance contract: %s", res.FaultException))
|
||||||
|
}
|
||||||
|
|
||||||
|
var balance *big.Int
|
||||||
|
if len(res.Stack) != 0 {
|
||||||
|
balance, _ = res.Stack[0].TryInteger()
|
||||||
|
}
|
||||||
|
|
||||||
|
if balance == nil {
|
||||||
|
fatalOnErr(errors.New("invalid response from balance contract"))
|
||||||
|
}
|
||||||
|
|
||||||
|
ok := getConfirmation(false, fmt.Sprintf("Current NeoFS balance is %s, make a deposit? y/[n]: ",
|
||||||
|
fixedn.ToString(balance, 12)))
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
amountStr := getString("Enter amount in GAS: ")
|
||||||
|
amount, err := fixedn.FromString(amountStr, 8)
|
||||||
|
if err != nil {
|
||||||
|
fatalOnErr(fmt.Errorf("invalid amount: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
mainClient := initClient(n3config[network].RPC)
|
||||||
|
neofsHash, _ := util.Uint160DecodeStringLE(n3config[network].NeoFSContract)
|
||||||
|
|
||||||
|
gasHash, err := mainClient.GetNativeContractHash(nativenames.Gas)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
tx, err := mainClient.CreateNEP17TransferTx(acc, neofsHash, gasHash, amount.Int64(), 0, nil, nil)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
txHash, err := mainClient.SignAndPushTx(tx, acc, nil)
|
||||||
|
fatalOnErr(err)
|
||||||
|
|
||||||
|
cmd.Print("Waiting for transactions to persist.")
|
||||||
|
tick := time.NewTicker(time.Second / 2)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
timer := time.NewTimer(time.Second * 20)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
at := trigger.Application
|
||||||
|
|
||||||
|
loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-tick.C:
|
||||||
|
_, err := mainClient.GetApplicationLog(txHash, &at)
|
||||||
|
if err == nil {
|
||||||
|
cmd.Print("\n")
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
cmd.Print(".")
|
||||||
|
case <-timer.C:
|
||||||
|
cmd.Printf("\nTimeout while waiting for transaction to persist.\n")
|
||||||
|
if getConfirmation(false, "Continue configuration? yes/[no]: ") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initClient(rpc []string) *client.Client {
|
||||||
|
var c *client.Client
|
||||||
|
var err error
|
||||||
|
|
||||||
|
shuffled := make([]string, len(rpc))
|
||||||
|
copy(shuffled, rpc)
|
||||||
|
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||||||
|
|
||||||
|
for _, endpoint := range shuffled {
|
||||||
|
c, err = client.New(context.Background(), "https://"+endpoint, client.Options{
|
||||||
|
DialTimeout: time.Second * 2,
|
||||||
|
RequestTimeout: time.Second * 5,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err = c.Init(); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
fatalOnErr(fmt.Errorf("can't create N3 client: %w", err))
|
||||||
|
panic("unreachable")
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/flynn-archive/go-shlex"
|
"github.com/flynn-archive/go-shlex"
|
||||||
|
@ -29,8 +30,11 @@ Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', '
|
||||||
|
|
||||||
Filter consists of <typ>:<key><match><value>
|
Filter consists of <typ>:<key><match><value>
|
||||||
Typ is 'obj' for object applied filter or 'req' for request applied filter.
|
Typ is 'obj' for object applied filter or 'req' for request applied filter.
|
||||||
Key is a valid unicode string corresponding to object or request header key.
|
Key is a valid unicode string corresponding to object or request header key.
|
||||||
Match is '==' for matching and '!=' for non-matching filter.
|
Well-known system object headers start with '$Object:' prefix.
|
||||||
|
User defined headers start without prefix.
|
||||||
|
Read more about filter keys at github.com/nspcc-dev/neofs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
|
||||||
|
Match is '=' for matching and '!=' for non-matching filter.
|
||||||
Value is a valid unicode string corresponding to object or request header value.
|
Value is a valid unicode string corresponding to object or request header value.
|
||||||
|
|
||||||
Target is
|
Target is
|
||||||
|
@ -64,20 +68,20 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
containerID := cid.New()
|
containerID := cid.New()
|
||||||
if err := containerID.Parse(cidArg); err != nil {
|
if err := containerID.Parse(cidArg); err != nil {
|
||||||
cmd.Printf("invalid container ID: %v", err)
|
cmd.PrintErrf("invalid container ID: %v\n", err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
rulesFile, err := getRulesFromFile(fileArg)
|
rulesFile, err := getRulesFromFile(fileArg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Printf("can't read rules from file : %v", err)
|
cmd.PrintErrf("can't read rules from file: %v\n", err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
rules = append(rules, rulesFile...)
|
rules = append(rules, rulesFile...)
|
||||||
if len(rules) == 0 {
|
if len(rules) == 0 {
|
||||||
cmd.Println("no extended ACL rules has been provided")
|
cmd.PrintErrln("no extended ACL rules has been provided")
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
tb := eacl.NewTable()
|
tb := eacl.NewTable()
|
||||||
|
@ -85,14 +89,14 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
||||||
for _, ruleStr := range rules {
|
for _, ruleStr := range rules {
|
||||||
r, err := shlex.Split(ruleStr)
|
r, err := shlex.Split(ruleStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Printf("can't parse rule '%s': %v)", ruleStr, err)
|
cmd.PrintErrf("can't parse rule '%s': %v\n", ruleStr, err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = parseTable(tb, r)
|
err = parseTable(tb, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Printf("can't create extended ACL record from rule '%s': %v", ruleStr, err)
|
cmd.PrintErrf("can't create extended ACL record from rule '%s': %v\n", ruleStr, err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,15 +104,15 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
data, err := tb.MarshalJSON()
|
data, err := tb.MarshalJSON()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Println(err)
|
cmd.PrintErrln(err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
err = json.Indent(buf, data, "", " ")
|
err = json.Indent(buf, data, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Println(err)
|
cmd.PrintErrln(err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(outArg) == 0 {
|
if len(outArg) == 0 {
|
||||||
|
@ -118,7 +122,8 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
err = ioutil.WriteFile(outArg, buf.Bytes(), 0644)
|
err = ioutil.WriteFile(outArg, buf.Bytes(), 0644)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cmd.Println(err)
|
cmd.PrintErrln(err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -215,9 +215,11 @@ func getKeyNoGenerate() (*ecdsa.PrivateKey, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPassword() (string, error) {
|
func getPassword() (string, error) {
|
||||||
if pass := viper.GetString(password); pass != "" {
|
// this check allows empty passwords
|
||||||
return pass, nil
|
if viper.IsSet(password) {
|
||||||
|
return viper.GetString(password), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return input.ReadPassword("Enter password > ")
|
return input.ReadPassword("Enter password > ")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
@ -25,8 +24,6 @@ type valueWithTime struct {
|
||||||
|
|
||||||
// entity that provides TTL cache interface.
|
// entity that provides TTL cache interface.
|
||||||
type ttlNetCache struct {
|
type ttlNetCache struct {
|
||||||
mtx sync.Mutex
|
|
||||||
|
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
|
|
||||||
sz int
|
sz int
|
||||||
|
@ -55,9 +52,6 @@ func newNetworkTTLCache(sz int, ttl time.Duration, netRdr netValueReader) *ttlNe
|
||||||
//
|
//
|
||||||
// returned value should not be modified.
|
// returned value should not be modified.
|
||||||
func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
val, ok := c.cache.Peek(key)
|
val, ok := c.cache.Peek(key)
|
||||||
if ok {
|
if ok {
|
||||||
valWithTime := val.(*valueWithTime)
|
valWithTime := val.(*valueWithTime)
|
||||||
|
@ -83,29 +77,21 @@ func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache) remove(key interface{}) {
|
func (c *ttlNetCache) remove(key interface{}) {
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
c.cache.Remove(key)
|
c.cache.Remove(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache) keys() []interface{} {
|
func (c *ttlNetCache) keys() []interface{} {
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
return c.cache.Keys()
|
return c.cache.Keys()
|
||||||
}
|
}
|
||||||
|
|
||||||
// entity that provides LRU cache interface.
|
// entity that provides LRU cache interface.
|
||||||
type lruNetCache struct {
|
type lruNetCache struct {
|
||||||
mtx sync.Mutex
|
|
||||||
|
|
||||||
cache *lru.Cache
|
cache *lru.Cache
|
||||||
|
|
||||||
netRdr netValueReader
|
netRdr netValueReader
|
||||||
}
|
}
|
||||||
|
|
||||||
// complicates netValueReader with LRU caching mechanism.
|
// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
|
||||||
func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
||||||
cache, err := lru.New(sz)
|
cache, err := lru.New(sz)
|
||||||
fatalOnErr(err)
|
fatalOnErr(err)
|
||||||
|
@ -122,9 +108,6 @@ func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
||||||
//
|
//
|
||||||
// returned value should not be modified.
|
// returned value should not be modified.
|
||||||
func (c *lruNetCache) get(key interface{}) (interface{}, error) {
|
func (c *lruNetCache) get(key interface{}) (interface{}, error) {
|
||||||
c.mtx.Lock()
|
|
||||||
defer c.mtx.Unlock()
|
|
||||||
|
|
||||||
val, ok := c.cache.Get(key)
|
val, ok := c.cache.Get(key)
|
||||||
if ok {
|
if ok {
|
||||||
return val, nil
|
return val, nil
|
||||||
|
@ -336,3 +319,39 @@ func (s *ttlContainerLister) InvalidateContainerListByCID(id *cid.ID) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type cachedIRFetcher ttlNetCache
|
||||||
|
|
||||||
|
func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) *cachedIRFetcher {
|
||||||
|
const (
|
||||||
|
irFetcherCacheSize = 1 // we intend to store only one value
|
||||||
|
|
||||||
|
// Without the cache in the testnet we can see several hundred simultaneous
|
||||||
|
// requests (neofs-node #1278), so limiting the request rate solves the issue.
|
||||||
|
//
|
||||||
|
// Exact request rate doesn't really matter because Inner Ring list update
|
||||||
|
// happens extremely rare, but there is no side chain events for that as
|
||||||
|
// for now (neofs-contract v0.15.0 notary disabled env) to monitor it.
|
||||||
|
irFetcherCacheTTL = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
||||||
|
func(key interface{}) (interface{}, error) {
|
||||||
|
return f.InnerRingKeys()
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return (*cachedIRFetcher)(irFetcherCache)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
|
||||||
|
// the cache or expired, then it returns keys from side chain and updates
|
||||||
|
// the cache.
|
||||||
|
func (f *cachedIRFetcher) InnerRingKeys() ([][]byte, error) {
|
||||||
|
val, err := (*ttlNetCache)(f).get("")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return val.([][]byte), nil
|
||||||
|
}
|
||||||
|
|
|
@ -116,6 +116,8 @@ type cfg struct {
|
||||||
clientCache *cache.ClientCache
|
clientCache *cache.ClientCache
|
||||||
|
|
||||||
persistate *state.PersistentStorage
|
persistate *state.PersistentStorage
|
||||||
|
|
||||||
|
netMapSource netmapCore.Source
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgGRPC struct {
|
type cfgGRPC struct {
|
||||||
|
@ -175,8 +177,6 @@ type cfgNodeInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfgObject struct {
|
type cfgObject struct {
|
||||||
netMapSource netmapCore.Source
|
|
||||||
|
|
||||||
cnrSource container.Source
|
cnrSource container.Source
|
||||||
|
|
||||||
eaclSource eacl.Source
|
eaclSource eacl.Source
|
||||||
|
|
|
@ -105,7 +105,7 @@ func initContainerService(c *cfg) {
|
||||||
|
|
||||||
loadPlacementBuilder := &loadPlacementBuilder{
|
loadPlacementBuilder := &loadPlacementBuilder{
|
||||||
log: c.log,
|
log: c.log,
|
||||||
nmSrc: c.cfgNetmap.wrapper,
|
nmSrc: c.netMapSource,
|
||||||
cnrSrc: cnrSrc,
|
cnrSrc: cnrSrc,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ func initControlService(c *cfg) {
|
||||||
controlSvc.WithKey(&c.key.PrivateKey),
|
controlSvc.WithKey(&c.key.PrivateKey),
|
||||||
controlSvc.WithAuthorizedKeys(rawPubs),
|
controlSvc.WithAuthorizedKeys(rawPubs),
|
||||||
controlSvc.WithHealthChecker(c),
|
controlSvc.WithHealthChecker(c),
|
||||||
controlSvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
controlSvc.WithNetMapSource(c.netMapSource),
|
||||||
controlSvc.WithNodeState(c),
|
controlSvc.WithNodeState(c),
|
||||||
controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error {
|
controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error {
|
||||||
prm := new(engine.DeletePrm).WithAddresses(addrList...)
|
prm := new(engine.DeletePrm).WithAddresses(addrList...)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
|
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
@ -95,7 +96,19 @@ func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {
|
||||||
|
|
||||||
l.Info("stopping gRPC server...")
|
l.Info("stopping gRPC server...")
|
||||||
|
|
||||||
s.GracefulStop()
|
// GracefulStop() may freeze forever, see #1270
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
s.GracefulStop()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(1 * time.Minute):
|
||||||
|
l.Info("gRPC cannot shutdown gracefully, forcing stop")
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
l.Info("gRPC server stopped successfully")
|
l.Info("gRPC server stopped successfully")
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,7 +99,7 @@ func initMorphComponents(c *cfg) {
|
||||||
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cfgObject.netMapSource = netmapSource
|
c.netMapSource = netmapSource
|
||||||
c.cfgNetmap.wrapper = wrap
|
c.cfgNetmap.wrapper = wrap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
clientConstructor := &reputationClientConstructor{
|
clientConstructor := &reputationClientConstructor{
|
||||||
log: c.log,
|
log: c.log,
|
||||||
nmSrc: c.cfgObject.netMapSource,
|
nmSrc: c.netMapSource,
|
||||||
netState: c.cfgNetmap.state,
|
netState: c.cfgNetmap.state,
|
||||||
trustStorage: c.cfgReputation.localTrustStorage,
|
trustStorage: c.cfgReputation.localTrustStorage,
|
||||||
basicConstructor: c.clientCache,
|
basicConstructor: c.clientCache,
|
||||||
|
@ -228,7 +228,7 @@ func initObjectService(c *cfg) {
|
||||||
policer.WithLocalStorage(ls),
|
policer.WithLocalStorage(ls),
|
||||||
policer.WithContainerSource(c.cfgObject.cnrSource),
|
policer.WithContainerSource(c.cfgObject.cnrSource),
|
||||||
policer.WithPlacementBuilder(
|
policer.WithPlacementBuilder(
|
||||||
placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapSource),
|
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||||
),
|
),
|
||||||
policer.WithRemoteHeader(
|
policer.WithRemoteHeader(
|
||||||
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
||||||
|
@ -251,7 +251,7 @@ func initObjectService(c *cfg) {
|
||||||
policer.WithNodeLoader(c),
|
policer.WithNodeLoader(c),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapSource, c.cfgObject.cnrSource, c)
|
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||||
|
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, pol)
|
||||||
|
|
||||||
|
@ -261,7 +261,7 @@ func initObjectService(c *cfg) {
|
||||||
putsvc.WithMaxSizeSource(c),
|
putsvc.WithMaxSizeSource(c),
|
||||||
putsvc.WithLocalStorage(ls),
|
putsvc.WithLocalStorage(ls),
|
||||||
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
||||||
putsvc.WithNetworkMapSource(c.cfgObject.netMapSource),
|
putsvc.WithNetworkMapSource(c.netMapSource),
|
||||||
putsvc.WithNetmapKeys(c),
|
putsvc.WithNetmapKeys(c),
|
||||||
putsvc.WithFormatValidatorOpts(
|
putsvc.WithFormatValidatorOpts(
|
||||||
objectCore.WithDeleteHandler(objInhumer),
|
objectCore.WithDeleteHandler(objInhumer),
|
||||||
|
@ -285,7 +285,7 @@ func initObjectService(c *cfg) {
|
||||||
placement.WithoutSuccessTracking(),
|
placement.WithoutSuccessTracking(),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
searchsvc.WithNetMapSource(c.netMapSource),
|
||||||
searchsvc.WithKeyStorage(keyStorage),
|
searchsvc.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -303,7 +303,7 @@ func initObjectService(c *cfg) {
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
getsvc.WithNetMapSource(c.netMapSource),
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
getsvc.WithKeyStorage(keyStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -348,8 +348,8 @@ func initObjectService(c *cfg) {
|
||||||
acl.WithSenderClassifier(
|
acl.WithSenderClassifier(
|
||||||
acl.NewSenderClassifier(
|
acl.NewSenderClassifier(
|
||||||
c.log,
|
c.log,
|
||||||
irFetcher,
|
newCachedIRFetcher(irFetcher),
|
||||||
c.cfgNetmap.wrapper,
|
c.netMapSource,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
acl.WithContainerSource(
|
acl.WithContainerSource(
|
||||||
|
|
|
@ -37,8 +37,7 @@ func initReputationService(c *cfg) {
|
||||||
|
|
||||||
localKey := c.key.PublicKey().Bytes()
|
localKey := c.key.PublicKey().Bytes()
|
||||||
|
|
||||||
// consider sharing this between application components
|
nmSrc := c.netMapSource
|
||||||
nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper)
|
|
||||||
|
|
||||||
// storing calculated trusts as a daughter
|
// storing calculated trusts as a daughter
|
||||||
c.cfgReputation.localTrustStorage = truststorage.New(
|
c.cfgReputation.localTrustStorage = truststorage.New(
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
version: "2.4"
|
version: "2.4"
|
||||||
services:
|
services:
|
||||||
storage01:
|
storage01:
|
||||||
image: nspccdev/neofs-storage-testnet:0.27.5
|
image: nspccdev/neofs-storage-testnet:0.27.7
|
||||||
container_name: neofs-testnet
|
container_name: neofs-testnet
|
||||||
env_file: node_config.env
|
env_file: node_config.env
|
||||||
network_mode: host
|
network_mode: host
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -3,6 +3,7 @@ module github.com/nspcc-dev/neofs-node
|
||||||
go 1.16
|
go 1.16
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
|
||||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||||
github.com/google/go-github/v39 v39.2.0
|
github.com/google/go-github/v39 v39.2.0
|
||||||
github.com/google/uuid v1.2.0
|
github.com/google/uuid v1.2.0
|
||||||
|
|
3
go.sum
3
go.sum
|
@ -90,8 +90,11 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
|
||||||
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
|
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
|
||||||
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
|
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
|
||||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
|
||||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||||
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
|
||||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||||
|
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
|
||||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||||
|
|
|
@ -238,6 +238,7 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR
|
||||||
|
|
||||||
cliPrm.WithAddress(prm.objAddr)
|
cliPrm.WithAddress(prm.objAddr)
|
||||||
cliPrm.WithRangeList(prm.rng)
|
cliPrm.WithRangeList(prm.rng)
|
||||||
|
cliPrm.TZ()
|
||||||
|
|
||||||
cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm,
|
cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm,
|
||||||
client.WithKey(x.key),
|
client.WithKey(x.key),
|
||||||
|
|
|
@ -325,12 +325,16 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
||||||
|
|
||||||
res, err = b.getRangeFromLevel(prm, p, !ok)
|
res, err = b.getRangeFromLevel(prm, p, !ok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, object.ErrNotFound) {
|
outOfBounds := errors.Is(err, object.ErrRangeOutOfBounds)
|
||||||
|
if !errors.Is(err, object.ErrNotFound) && !outOfBounds {
|
||||||
b.log.Debug("could not get object from level",
|
b.log.Debug("could not get object from level",
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
if outOfBounds {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
activeCache[dirPath] = struct{}{}
|
||||||
|
@ -592,7 +596,7 @@ func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRang
|
||||||
// we don't use GetRange call for now since blobovnicza
|
// we don't use GetRange call for now since blobovnicza
|
||||||
// stores data that is compressed on BlobStor side.
|
// stores data that is compressed on BlobStor side.
|
||||||
// If blobovnicza learns to do the compression itself,
|
// If blobovnicza learns to do the compression itself,
|
||||||
// wecan start using GetRange.
|
// we can start using GetRange.
|
||||||
res, err := blz.Get(gPrm)
|
res, err := blz.Get(gPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
||||||
|
@ -27,6 +29,10 @@ func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
|
||||||
// get compressed object data
|
// get compressed object data
|
||||||
data, err := b.fsTree.Get(prm.addr)
|
data, err := b.fsTree.Get(prm.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||||
|
return nil, object.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("could not read object from fs tree: %w", err)
|
return nil, fmt.Errorf("could not read object from fs tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,10 @@ package engine
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -48,3 +50,38 @@ func TestExecBlocks(t *testing.T) {
|
||||||
// try to resume
|
// try to resume
|
||||||
require.Error(t, e.ResumeExecution())
|
require.Error(t, e.ResumeExecution())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPersistentShardID(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
|
checkShardState(t, e, id[0], shard.ModeReadWrite)
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
|
||||||
|
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
require.Equal(t, id, newID)
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
|
||||||
|
p1 := e.shards[id[0].String()].DumpInfo().MetaBaseInfo.Path
|
||||||
|
p2 := e.shards[id[1].String()].DumpInfo().MetaBaseInfo.Path
|
||||||
|
tmp := filepath.Join(dir, "tmp")
|
||||||
|
require.NoError(t, os.Rename(p1, tmp))
|
||||||
|
require.NoError(t, os.Rename(p2, p1))
|
||||||
|
require.NoError(t, os.Rename(tmp, p2))
|
||||||
|
|
||||||
|
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
require.Equal(t, id[1], newID[0])
|
||||||
|
require.Equal(t, id[0], newID[1])
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, mode shard.Mode) {
|
||||||
|
e.mtx.RLock()
|
||||||
|
sh := e.shards[id.String()]
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
|
require.Equal(t, mode, sh.GetMode())
|
||||||
|
}
|
||||||
|
|
110
pkg/local_object_storage/engine/error_test.go
Normal file
110
pkg/local_object_storage/engine/error_test.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap/zaptest"
|
||||||
|
)
|
||||||
|
|
||||||
|
const errSmallSize = 256
|
||||||
|
|
||||||
|
func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||||
|
if dir == "" {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
dir, err = os.MkdirTemp("", "*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||||
|
}
|
||||||
|
|
||||||
|
e := New(
|
||||||
|
WithLogger(zaptest.NewLogger(t)),
|
||||||
|
WithShardPoolSize(1))
|
||||||
|
|
||||||
|
var ids [2]*shard.ID
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for i := range ids {
|
||||||
|
ids[i], err = e.AddShard(
|
||||||
|
shard.WithLogger(zaptest.NewLogger(t)),
|
||||||
|
shard.WithBlobStorOptions(
|
||||||
|
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||||
|
blobstor.WithShallowDepth(1),
|
||||||
|
blobstor.WithBlobovniczaShallowWidth(1),
|
||||||
|
blobstor.WithBlobovniczaShallowDepth(1),
|
||||||
|
blobstor.WithSmallSizeLimit(errSmallSize),
|
||||||
|
blobstor.WithRootPerm(0700)),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||||
|
meta.WithPermissions(0700)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.NoError(t, e.Open())
|
||||||
|
require.NoError(t, e.Init())
|
||||||
|
|
||||||
|
return e, dir, ids
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue #1186.
|
||||||
|
func TestBlobstorFailback(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||||
|
|
||||||
|
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
|
objs := make([]*object.Object, 0, 2)
|
||||||
|
for _, size := range []int{15, errSmallSize + 1} {
|
||||||
|
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||||
|
obj.SetPayload(make([]byte, size))
|
||||||
|
|
||||||
|
prm := new(shard.PutPrm).WithObject(obj.Object())
|
||||||
|
e.mtx.RLock()
|
||||||
|
_, err = e.shards[id[0].String()].Put(prm)
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
require.NoError(t, err)
|
||||||
|
objs = append(objs, obj.Object())
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range objs {
|
||||||
|
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = e.GetRange(&RngPrm{addr: objs[i].Address()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, e.Close())
|
||||||
|
|
||||||
|
p1 := e.shards[id[0].String()].DumpInfo().BlobStorInfo.RootPath
|
||||||
|
p2 := e.shards[id[1].String()].DumpInfo().BlobStorInfo.RootPath
|
||||||
|
tmp := filepath.Join(dir, "tmp")
|
||||||
|
require.NoError(t, os.Rename(p1, tmp))
|
||||||
|
require.NoError(t, os.Rename(p2, p1))
|
||||||
|
require.NoError(t, os.Rename(tmp, p2))
|
||||||
|
|
||||||
|
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
|
for i := range objs {
|
||||||
|
getRes, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objs[i], getRes.Object())
|
||||||
|
|
||||||
|
rngRes, err := e.GetRange(&RngPrm{addr: objs[i].Address(), off: 1, ln: 10})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||||
|
|
||||||
|
_, err = e.GetRange(&RngPrm{addr: objs[i].Address(), off: errSmallSize + 10, ln: 1})
|
||||||
|
require.True(t, errors.Is(err, object.ErrRangeOutOfBounds), "got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,6 +64,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
||||||
|
|
||||||
outSI *objectSDK.SplitInfo
|
outSI *objectSDK.SplitInfo
|
||||||
outError = object.ErrNotFound
|
outError = object.ErrNotFound
|
||||||
|
|
||||||
|
shardWithMeta hashedShard
|
||||||
)
|
)
|
||||||
|
|
||||||
shPrm := new(shard.GetPrm).
|
shPrm := new(shard.GetPrm).
|
||||||
|
@ -72,6 +74,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||||
res, err := sh.Get(shPrm)
|
res, err := sh.Get(shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if res.HasMeta() {
|
||||||
|
shardWithMeta = hashedShard{sh: sh}
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, object.ErrNotFound):
|
case errors.Is(err, object.ErrNotFound):
|
||||||
return false // ignore, go to next shard
|
return false // ignore, go to next shard
|
||||||
|
@ -116,7 +121,23 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
return nil, outError
|
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||||
|
return nil, outError
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the object is not found but is present in metabase,
|
||||||
|
// try to fetch it from blobstor directly. If it is found in any
|
||||||
|
// blobstor, increase the error counter for the shard which contains the meta.
|
||||||
|
shPrm = shPrm.WithIgnoreMeta(true)
|
||||||
|
|
||||||
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||||
|
res, err := sh.Get(shPrm)
|
||||||
|
obj = res.Object()
|
||||||
|
return err == nil
|
||||||
|
})
|
||||||
|
if obj == nil {
|
||||||
|
return nil, outError
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &GetRes{
|
return &GetRes{
|
||||||
|
|
|
@ -82,6 +82,8 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
|
|
||||||
outSI *objectSDK.SplitInfo
|
outSI *objectSDK.SplitInfo
|
||||||
outError = object.ErrNotFound
|
outError = object.ErrNotFound
|
||||||
|
|
||||||
|
shardWithMeta hashedShard
|
||||||
)
|
)
|
||||||
|
|
||||||
shPrm := new(shard.RngPrm).
|
shPrm := new(shard.RngPrm).
|
||||||
|
@ -91,6 +93,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||||
res, err := sh.GetRange(shPrm)
|
res, err := sh.GetRange(shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if res.HasMeta() {
|
||||||
|
shardWithMeta = hashedShard{sh: sh}
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, object.ErrNotFound):
|
case errors.Is(err, object.ErrNotFound):
|
||||||
return false // ignore, go to next shard
|
return false // ignore, go to next shard
|
||||||
|
@ -137,7 +142,27 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
return nil, outError
|
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||||
|
return nil, outError
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the object is not found but is present in metabase,
|
||||||
|
// try to fetch it from blobstor directly. If it is found in any
|
||||||
|
// blobstor, increase the error counter for the shard which contains the meta.
|
||||||
|
shPrm = shPrm.WithIgnoreMeta(true)
|
||||||
|
|
||||||
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||||
|
res, err := sh.GetRange(shPrm)
|
||||||
|
if errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||||
|
outError = object.ErrRangeOutOfBounds
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
obj = res.Object()
|
||||||
|
return err == nil
|
||||||
|
})
|
||||||
|
if obj == nil {
|
||||||
|
return nil, outError
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &RngRes{
|
return &RngRes{
|
||||||
|
|
|
@ -25,26 +25,34 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
defer e.mtx.Unlock()
|
defer e.mtx.Unlock()
|
||||||
|
|
||||||
id, err := generateShardID()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
strID := id.String()
|
id, err := generateShardID()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
e.shards[strID] = shard.New(append(opts,
|
sh := shard.New(append(opts,
|
||||||
shard.WithID(id),
|
shard.WithID(id),
|
||||||
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
|
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
|
||||||
)...)
|
)...)
|
||||||
|
|
||||||
|
if err := sh.UpdateID(); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not open shard: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
strID := sh.ID().String()
|
||||||
|
if _, ok := e.shards[strID]; ok {
|
||||||
|
return nil, fmt.Errorf("shard with id %s was already added", strID)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.shards[strID] = sh
|
||||||
e.shardPools[strID] = pool
|
e.shardPools[strID] = pool
|
||||||
|
|
||||||
return id, nil
|
return sh.ID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateShardID() (*shard.ID, error) {
|
func generateShardID() (*shard.ID, error) {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -47,9 +48,17 @@ type referenceCounter map[string]*referenceNumber
|
||||||
|
|
||||||
// Delete removed object records from metabase indexes.
|
// Delete removed object records from metabase indexes.
|
||||||
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
||||||
return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error {
|
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
return db.deleteGroup(tx, prm.addrs)
|
return db.deleteGroup(tx, prm.addrs)
|
||||||
})
|
})
|
||||||
|
if err == nil {
|
||||||
|
for i := range prm.addrs {
|
||||||
|
storagelog.Write(db.log,
|
||||||
|
storagelog.AddressField(prm.addrs[i]),
|
||||||
|
storagelog.OpField("metabase DELETE"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new(DeleteRes), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error {
|
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -68,6 +69,11 @@ func (db *DB) Put(prm *PutPrm) (res *PutRes, err error) {
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
return db.put(tx, prm.obj, prm.id, nil)
|
return db.put(tx, prm.obj, prm.id, nil)
|
||||||
})
|
})
|
||||||
|
if err == nil {
|
||||||
|
storagelog.Write(db.log,
|
||||||
|
storagelog.AddressField(prm.obj.Address()),
|
||||||
|
storagelog.OpField("metabase PUT"))
|
||||||
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
36
pkg/local_object_storage/metabase/shard_id.go
Normal file
36
pkg/local_object_storage/metabase/shard_id.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
shardInfoBucket = []byte(invalidBase58String + "i")
|
||||||
|
shardIDKey = []byte("id")
|
||||||
|
)
|
||||||
|
|
||||||
|
// ReadShardID reads shard id from db.
|
||||||
|
// If id is missing, returns nil, nil.
|
||||||
|
func (db *DB) ReadShardID() ([]byte, error) {
|
||||||
|
var id []byte
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(shardInfoBucket)
|
||||||
|
if b != nil {
|
||||||
|
id = slice.Copy(b.Get(shardIDKey))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return id, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteShardID writes shard it to db.
|
||||||
|
func (db *DB) WriteShardID(id []byte) error {
|
||||||
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return b.Put(shardIDKey, id)
|
||||||
|
})
|
||||||
|
}
|
|
@ -26,7 +26,6 @@ func (s *Shard) Open() error {
|
||||||
return fmt.Errorf("could not open %T: %w", component, err)
|
return fmt.Errorf("could not open %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob
|
||||||
|
|
||||||
// GetPrm groups the parameters of Get operation.
|
// GetPrm groups the parameters of Get operation.
|
||||||
type GetPrm struct {
|
type GetPrm struct {
|
||||||
addr *objectSDK.Address
|
addr *objectSDK.Address
|
||||||
|
skipMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRes groups resulting values of Get operation.
|
// GetRes groups resulting values of Get operation.
|
||||||
type GetRes struct {
|
type GetRes struct {
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
hasMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithAddress is a Get option to set the address of the requested object.
|
// WithAddress is a Get option to set the address of the requested object.
|
||||||
|
@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||||
|
// without accessing metabase.
|
||||||
|
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
|
||||||
|
p.skipMeta = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Object returns the requested object.
|
// Object returns the requested object.
|
||||||
func (r *GetRes) Object() *object.Object {
|
func (r *GetRes) Object() *object.Object {
|
||||||
return r.obj
|
return r.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasMeta returns true if info about the object was found in the metabase.
|
||||||
|
func (r *GetRes) HasMeta() bool {
|
||||||
|
return r.hasMeta
|
||||||
|
}
|
||||||
|
|
||||||
// Get reads an object from shard.
|
// Get reads an object from shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
|
@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
|
||||||
return res.Object(), nil
|
return res.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||||
|
|
||||||
return &GetRes{
|
return &GetRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
hasMeta: hasMeta,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||||
func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) (*object.Object, error) {
|
func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
res *object.Object
|
res *object.Object
|
||||||
|
@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
res, err = s.writeCache.Get(addr)
|
res, err = s.writeCache.Get(addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return res, nil
|
return res, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors.Is(err, object.ErrNotFound) {
|
if errors.Is(err, object.ErrNotFound) {
|
||||||
|
@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if skipMeta {
|
||||||
|
res, err = small(s.blobStor, nil)
|
||||||
|
if err == nil || errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||||
|
return res, false, err
|
||||||
|
}
|
||||||
|
res, err = big(s.blobStor, nil)
|
||||||
|
return res, false, err
|
||||||
|
}
|
||||||
|
|
||||||
exists, err := meta.Exists(s.metaBase, addr)
|
exists, err := meta.Exists(s.metaBase, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, object.ErrNotFound
|
return nil, false, object.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
|
blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if blobovniczaID != nil {
|
if blobovniczaID != nil {
|
||||||
|
@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
||||||
res, err = big(s.blobStor, nil)
|
res, err = big(s.blobStor, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, err
|
return res, true, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,3 +23,25 @@ func (id ID) String() string {
|
||||||
func (s *Shard) ID() *ID {
|
func (s *Shard) ID() *ID {
|
||||||
return s.info.ID
|
return s.info.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||||
|
func (s *Shard) UpdateID() (err error) {
|
||||||
|
if err = s.metaBase.Open(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
cErr := s.metaBase.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = cErr
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
id, err := s.metaBase.ReadShardID()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(id) != 0 {
|
||||||
|
s.info.ID = NewIDFromBytes(id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s.metaBase.WriteShardID(*s.info.ID)
|
||||||
|
}
|
||||||
|
|
|
@ -14,11 +14,14 @@ type RngPrm struct {
|
||||||
off uint64
|
off uint64
|
||||||
|
|
||||||
addr *objectSDK.Address
|
addr *objectSDK.Address
|
||||||
|
|
||||||
|
skipMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// RngRes groups resulting values of GetRange operation.
|
// RngRes groups resulting values of GetRange operation.
|
||||||
type RngRes struct {
|
type RngRes struct {
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
hasMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithAddress is a Rng option to set the address of the requested object.
|
// WithAddress is a Rng option to set the address of the requested object.
|
||||||
|
@ -41,6 +44,13 @@ func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||||
|
// without accessing metabase.
|
||||||
|
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
|
||||||
|
p.skipMeta = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Object returns the requested object part.
|
// Object returns the requested object part.
|
||||||
//
|
//
|
||||||
// Instance payload contains the requested range of the original object.
|
// Instance payload contains the requested range of the original object.
|
||||||
|
@ -48,6 +58,11 @@ func (r *RngRes) Object() *object.Object {
|
||||||
return r.obj
|
return r.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasMeta returns true if info about the object was found in the metabase.
|
||||||
|
func (r *RngRes) HasMeta() bool {
|
||||||
|
return r.hasMeta
|
||||||
|
}
|
||||||
|
|
||||||
// GetRange reads part of an object from shard.
|
// GetRange reads part of an object from shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
|
@ -94,9 +109,10 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
||||||
return obj.Object(), nil
|
return obj.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||||
|
|
||||||
return &RngRes{
|
return &RngRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
hasMeta: hasMeta,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ func New(opts ...Option) *Shard {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithID returns option to set shard identifier.
|
// WithID returns option to set the default shard identifier.
|
||||||
func WithID(id *ID) Option {
|
func WithID(id *ID) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.info.ID = id
|
c.info.ID = id
|
||||||
|
|
|
@ -52,7 +52,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
||||||
|
|
||||||
obj, err := engine.Get(p.localStorage, task.addr)
|
obj, err := engine.Get(p.localStorage, task.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.log.Error("could not get object from local storage")
|
p.log.Error("could not get object from local storage",
|
||||||
|
zap.Stringer("object", task.addr),
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue