forked from TrueCloudLab/frostfs-node
Compare commits
23 commits
master
...
support/v0
Author | SHA1 | Date | |
---|---|---|---|
|
1fc9351f4f | ||
|
1a2b8ab59d | ||
|
5180e467f8 | ||
|
a193db3a3d | ||
|
49ae91d720 | ||
|
b087d7ead3 | ||
|
e4f357561e | ||
|
437687f78d | ||
|
1cfa1763e9 | ||
|
3de3c102fc | ||
|
679df13924 | ||
|
73d367e287 | ||
|
5f54fd5dc8 | ||
|
fe3be92c89 | ||
|
2977552a19 | ||
|
48b5d2cb91 | ||
|
baad9d06a1 | ||
|
b5bcf90fa1 | ||
|
ce169491ed | ||
|
58f2354057 | ||
|
25b827e0fd | ||
|
00180a7ecf | ||
|
1f825a467a |
35 changed files with 1037 additions and 85 deletions
28
CHANGELOG.md
28
CHANGELOG.md
|
@ -3,6 +3,30 @@ Changelog for NeoFS Node
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.27.7] - 2022-03-30
|
||||
|
||||
### Fixed
|
||||
- Shard ID is now consistent between restarts (#1204)
|
||||
|
||||
### Added
|
||||
- More N3 RPC caches in object service (#1278)
|
||||
|
||||
## [0.27.6] - 2022-03-28
|
||||
|
||||
### Fixed
|
||||
- Allow empty passwords in neofs-cli config (#1136)
|
||||
- Set correct audit range hash type in neofs-ir (#1180)
|
||||
- Read objects directly from blobstor in case of shard inconsistency (#1186)
|
||||
- Fix `-w` flag in subnet commands of neofs-adm (#1223)
|
||||
- Do not use explicit mutex lock in chain caches (#1236)
|
||||
- Force gRPC server stop if it can't shut down gracefully in storage node (#1270)
|
||||
- Return non-zero exit code in `acl extended create` command failures and fix
|
||||
help message (#1259)
|
||||
|
||||
### Added
|
||||
- Interactive storage node configurator in neofs-adm (#1090)
|
||||
- Logs in metabase operations (#1188)
|
||||
|
||||
## [0.27.5] - 2022-01-31
|
||||
|
||||
### Fixed
|
||||
|
@ -916,7 +940,9 @@ NeoFS-API v2.0 support and updated brand-new storage node application.
|
|||
|
||||
First public review release.
|
||||
|
||||
[Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...master
|
||||
[Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.7...master
|
||||
[0.27.7]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.6...v0.27.7
|
||||
[0.27.6]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...v0.27.6
|
||||
[0.27.5]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.4...v0.27.5
|
||||
[0.27.4]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.3...v0.27.4
|
||||
[0.27.3]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.2...v0.27.3
|
||||
|
|
|
@ -770,6 +770,7 @@ var cmdSubnetNode = &cobra.Command{
|
|||
Short: "Manage nodes of the NeoFS subnet.",
|
||||
PreRun: func(cmd *cobra.Command, _ []string) {
|
||||
viperBindFlags(cmd,
|
||||
flagSubnetWallet,
|
||||
flagSubnetNode,
|
||||
flagSubnetNodeSubnet,
|
||||
)
|
||||
|
@ -882,8 +883,10 @@ func init() {
|
|||
|
||||
// subnet node flags
|
||||
nodeFlags := cmdSubnetNode.PersistentFlags()
|
||||
nodeFlags.StringP(flagSubnetWallet, "w", "", "Path to file with wallet")
|
||||
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetWallet)
|
||||
nodeFlags.String(flagSubnetNode, "", "Hex-encoded public key of the node")
|
||||
_ = cmdSubnetAdmin.MarkFlagRequired(flagSubnetNode)
|
||||
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNode)
|
||||
nodeFlags.String(flagSubnetNodeSubnet, "", "ID of the subnet to manage nodes")
|
||||
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNodeSubnet)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/config"
|
||||
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/morph"
|
||||
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/storagecfg"
|
||||
"github.com/nspcc-dev/neofs-node/misc"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/autocomplete"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -34,6 +35,7 @@ func init() {
|
|||
|
||||
rootCmd.AddCommand(config.RootCmd)
|
||||
rootCmd.AddCommand(morph.RootCmd)
|
||||
rootCmd.AddCommand(storagecfg.RootCmd)
|
||||
|
||||
rootCmd.AddCommand(autocomplete.Command("neofs-adm"))
|
||||
}
|
||||
|
|
143
cmd/neofs-adm/internal/modules/storagecfg/config.go
Normal file
143
cmd/neofs-adm/internal/modules/storagecfg/config.go
Normal file
|
@ -0,0 +1,143 @@
|
|||
package storagecfg
|
||||
|
||||
const configTemplate = `logger:
|
||||
level: info # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
|
||||
|
||||
node:
|
||||
wallet:
|
||||
path: {{ .Wallet.Path }} # path to a NEO wallet; ignored if key is presented
|
||||
address: {{ .Wallet.Account }} # address of a NEO account in the wallet; ignored if key is presented
|
||||
password: {{ .Wallet.Password }} # password for a NEO account in the wallet; ignored if key is presented
|
||||
addresses: # list of addresses announced by Storage node in the Network map
|
||||
- {{ .AnnouncedAddress }}
|
||||
attribute_0: UN-LOCODE:{{ .Attribute.Locode }}
|
||||
relay: {{ .Relay }} # start Storage node in relay mode without bootstrapping into the Network map
|
||||
subnet:
|
||||
exit_zero: false # toggle entrance to zero subnet (overrides corresponding attribute and occurrence in entries)
|
||||
entries: [] # list of IDs of subnets to enter in a text format of NeoFS API protocol (overrides corresponding attributes)
|
||||
|
||||
grpc:
|
||||
num: 1 # total number of listener endpoints
|
||||
0:
|
||||
endpoint: {{ .Endpoint }} # endpoint for gRPC server
|
||||
tls:{{if .TLSCert}}
|
||||
enabled: true # enable TLS for a gRPC connection (min version is TLS 1.2)
|
||||
certificate: {{ .TLSCert }} # path to TLS certificate
|
||||
key: {{ .TLSKey }} # path to TLS key
|
||||
{{- else }}
|
||||
enabled: false # disable TLS for a gRPC connection
|
||||
{{- end}}
|
||||
|
||||
control:
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use the Control Service
|
||||
{{- range .AuthorizedKeys }}
|
||||
- {{.}}{{end}}
|
||||
grpc:
|
||||
endpoint: {{.ControlEndpoint}} # endpoint that is listened by the Control Service
|
||||
|
||||
morph:
|
||||
dial_timeout: 20s # timeout for side chain NEO RPC client connection
|
||||
disable_cache: false # use TTL cache for side chain GET operations
|
||||
rpc_endpoint: # side chain N3 RPC endpoints
|
||||
{{- range .MorphRPC }}
|
||||
- https://{{.}}{{end}}
|
||||
notification_endpoint: # side chain N3 RPC notification endpoints
|
||||
{{- range .MorphRPC }}
|
||||
- wss://{{.}}/ws{{end}}
|
||||
{{if not .Relay }}
|
||||
storage:
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
shard_num: 1 # total number of shards
|
||||
|
||||
default: # section with the default shard parameters
|
||||
metabase:
|
||||
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
|
||||
|
||||
blobstor:
|
||||
perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
|
||||
depth: 2 # max depth of object tree storage in FS
|
||||
small_object_size: 102400 # 100KiB, size threshold for "small" objects which are stored in key-value DB, not in FS, bytes
|
||||
compress: true # turn on/off Zstandard compression (level 3) of stored objects
|
||||
compression_exclude_content_types:
|
||||
- audio/*
|
||||
- video/*
|
||||
|
||||
blobovnicza:
|
||||
size: 1073741824 # approximate size limit of single blobovnicza instance, total size will be: size*width^(depth+1), bytes
|
||||
depth: 1 # max depth of object tree storage in key-value DB
|
||||
width: 4 # max width of object tree storage in key-value DB
|
||||
opened_cache_capacity: 50 # maximum number of opened database files
|
||||
|
||||
gc:
|
||||
remover_batch_size: 200 # number of objects to be removed by the garbage collector
|
||||
remover_sleep_interval: 5m # frequency of the garbage collector invocation
|
||||
|
||||
shard:
|
||||
0:
|
||||
mode: "read-write" # mode of the shard, must be one of the: "read-write" (default), "read-only"
|
||||
|
||||
metabase:
|
||||
path: {{ .MetabasePath }} # path to the metabase
|
||||
|
||||
blobstor:
|
||||
path: {{ .BlobstorPath }} # path to the blobstor
|
||||
{{end}}`
|
||||
|
||||
const (
|
||||
neofsMainnetAddress = "2cafa46838e8b564468ebd868dcafdd99dce6221"
|
||||
balanceMainnetAddress = "dc1ec98d9d0c5f9dfade16144defe08cffc5ca55"
|
||||
neofsTestnetAddress = "b65d8243ac63983206d17e5221af0653a7266fa1"
|
||||
balanceTestnetAddress = "e0420c216003747626670d1424569c17c79015bf"
|
||||
)
|
||||
|
||||
var n3config = map[string]struct {
|
||||
MorphRPC []string
|
||||
RPC []string
|
||||
NeoFSContract string
|
||||
BalanceContract string
|
||||
}{
|
||||
"testnet": {
|
||||
MorphRPC: []string{
|
||||
"rpc01.morph.testnet.fs.neo.org:51331",
|
||||
"rpc02.morph.testnet.fs.neo.org:51331",
|
||||
"rpc03.morph.testnet.fs.neo.org:51331",
|
||||
"rpc04.morph.testnet.fs.neo.org:51331",
|
||||
"rpc05.morph.testnet.fs.neo.org:51331",
|
||||
"rpc06.morph.testnet.fs.neo.org:51331",
|
||||
"rpc07.morph.testnet.fs.neo.org:51331",
|
||||
},
|
||||
RPC: []string{
|
||||
"rpc01.testnet.n3.nspcc.ru:21331",
|
||||
"rpc02.testnet.n3.nspcc.ru:21331",
|
||||
"rpc03.testnet.n3.nspcc.ru:21331",
|
||||
"rpc04.testnet.n3.nspcc.ru:21331",
|
||||
"rpc05.testnet.n3.nspcc.ru:21331",
|
||||
"rpc06.testnet.n3.nspcc.ru:21331",
|
||||
"rpc07.testnet.n3.nspcc.ru:21331",
|
||||
},
|
||||
NeoFSContract: neofsTestnetAddress,
|
||||
BalanceContract: balanceTestnetAddress,
|
||||
},
|
||||
"mainnet": {
|
||||
MorphRPC: []string{
|
||||
"rpc1.morph.fs.neo.org:40341",
|
||||
"rpc2.morph.fs.neo.org:40341",
|
||||
"rpc3.morph.fs.neo.org:40341",
|
||||
"rpc4.morph.fs.neo.org:40341",
|
||||
"rpc5.morph.fs.neo.org:40341",
|
||||
"rpc6.morph.fs.neo.org:40341",
|
||||
"rpc7.morph.fs.neo.org:40341",
|
||||
},
|
||||
RPC: []string{
|
||||
"rpc1.n3.nspcc.ru:10331",
|
||||
"rpc2.n3.nspcc.ru:10331",
|
||||
"rpc3.n3.nspcc.ru:10331",
|
||||
"rpc4.n3.nspcc.ru:10331",
|
||||
"rpc5.n3.nspcc.ru:10331",
|
||||
"rpc6.n3.nspcc.ru:10331",
|
||||
"rpc7.n3.nspcc.ru:10331",
|
||||
},
|
||||
NeoFSContract: neofsMainnetAddress,
|
||||
BalanceContract: balanceMainnetAddress,
|
||||
},
|
||||
}
|
413
cmd/neofs-adm/internal/modules/storagecfg/root.go
Normal file
413
cmd/neofs-adm/internal/modules/storagecfg/root.go
Normal file
|
@ -0,0 +1,413 @@
|
|||
package storagecfg
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/chzyer/readline"
|
||||
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||
"github.com/nspcc-dev/neo-go/cli/input"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
|
||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
walletFlag = "wallet"
|
||||
accountFlag = "account"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultControlEndpoint = "127.0.0.1:8090"
|
||||
defaultDataEndpoint = "127.0.0.1"
|
||||
)
|
||||
|
||||
// RootCmd is a root command of config section.
|
||||
var RootCmd = &cobra.Command{
|
||||
Use: "storage-config [-w wallet] [-a acccount] [<path-to-config>]",
|
||||
Short: "Section for storage node configuration commands.",
|
||||
Run: storageConfig,
|
||||
}
|
||||
|
||||
func init() {
|
||||
fs := RootCmd.Flags()
|
||||
|
||||
fs.StringP(walletFlag, "w", "", "path to wallet")
|
||||
fs.StringP(accountFlag, "a", "", "wallet account")
|
||||
}
|
||||
|
||||
type config struct {
|
||||
AnnouncedAddress string
|
||||
AuthorizedKeys []string
|
||||
ControlEndpoint string
|
||||
Endpoint string
|
||||
TLSCert string
|
||||
TLSKey string
|
||||
MorphRPC []string
|
||||
Attribute struct {
|
||||
Locode string
|
||||
}
|
||||
Wallet struct {
|
||||
Path string
|
||||
Account string
|
||||
Password string
|
||||
}
|
||||
Relay bool
|
||||
BlobstorPath string
|
||||
MetabasePath string
|
||||
}
|
||||
|
||||
func storageConfig(cmd *cobra.Command, args []string) {
|
||||
var outPath string
|
||||
if len(args) != 0 {
|
||||
outPath = args[0]
|
||||
} else {
|
||||
outPath = getPath("File to write config at [./config.yml]: ")
|
||||
if outPath == "" {
|
||||
outPath = "./config.yml"
|
||||
}
|
||||
}
|
||||
|
||||
historyPath := filepath.Join(os.TempDir(), "neofs-adm.history")
|
||||
readline.SetHistoryPath(historyPath)
|
||||
|
||||
var c config
|
||||
|
||||
c.Wallet.Path, _ = cmd.Flags().GetString(walletFlag)
|
||||
if c.Wallet.Path == "" {
|
||||
c.Wallet.Path = getPath("Path to the storage node wallet: ")
|
||||
}
|
||||
|
||||
w, err := wallet.NewWalletFromFile(c.Wallet.Path)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.Wallet.Account, _ = cmd.Flags().GetString(accountFlag)
|
||||
if c.Wallet.Account == "" {
|
||||
addr := address.Uint160ToString(w.GetChangeAddress())
|
||||
c.Wallet.Account = getWalletAccount(w, fmt.Sprintf("Wallet account [%s]: ", addr))
|
||||
if c.Wallet.Account == "" {
|
||||
c.Wallet.Account = addr
|
||||
}
|
||||
}
|
||||
|
||||
accH, err := flags.ParseAddress(c.Wallet.Account)
|
||||
fatalOnErr(err)
|
||||
|
||||
acc := w.GetAccount(accH)
|
||||
if acc == nil {
|
||||
fatalOnErr(errors.New("can't find account in wallet"))
|
||||
}
|
||||
|
||||
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Account password for %s: ", c.Wallet.Account))
|
||||
fatalOnErr(err)
|
||||
|
||||
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
|
||||
fatalOnErr(err)
|
||||
|
||||
c.AuthorizedKeys = append(c.AuthorizedKeys, hex.EncodeToString(acc.PrivateKey().PublicKey().Bytes()))
|
||||
|
||||
var network string
|
||||
for {
|
||||
network = getString("Choose network [mainnet]/testnet: ")
|
||||
switch network {
|
||||
case "":
|
||||
network = "mainnet"
|
||||
case "testnet", "mainnet":
|
||||
default:
|
||||
cmd.Println(`Network must be either "mainnet" or "testnet"`)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
c.MorphRPC = n3config[network].MorphRPC
|
||||
|
||||
depositGas(cmd, acc, network)
|
||||
|
||||
c.Attribute.Locode = getString("UN-LOCODE attribute in [XX YYY] format: ")
|
||||
var addr, port string
|
||||
for {
|
||||
c.AnnouncedAddress = getString("Publicly announced address: ")
|
||||
addr, port, err = net.SplitHostPort(c.AnnouncedAddress)
|
||||
if err != nil {
|
||||
cmd.Println("Address must have form A.B.C.D:PORT")
|
||||
continue
|
||||
}
|
||||
|
||||
ip, err := net.ResolveIPAddr("ip", addr)
|
||||
if err != nil {
|
||||
cmd.Printf("Can't resolve IP address %s: %v\n", addr, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !ip.IP.IsGlobalUnicast() {
|
||||
cmd.Println("IP must be global unicast.")
|
||||
continue
|
||||
}
|
||||
cmd.Printf("Resolved IP address: %s\n", ip.String())
|
||||
|
||||
_, err = strconv.ParseUint(port, 10, 16)
|
||||
if err != nil {
|
||||
cmd.Println("Port must be an integer.")
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
defaultAddr := net.JoinHostPort(defaultDataEndpoint, port)
|
||||
c.Endpoint = getString(fmt.Sprintf("Listening address [%s]: ", defaultAddr))
|
||||
if c.Endpoint == "" {
|
||||
c.Endpoint = defaultAddr
|
||||
}
|
||||
|
||||
c.ControlEndpoint = getString(fmt.Sprintf("Listening address (control endpoint) [%s]: ", defaultControlEndpoint))
|
||||
if c.ControlEndpoint == "" {
|
||||
c.ControlEndpoint = defaultControlEndpoint
|
||||
}
|
||||
|
||||
c.TLSCert = getPath("TLS Certificate (optional): ")
|
||||
if c.TLSCert != "" {
|
||||
c.TLSKey = getPath("TLS Key: ")
|
||||
}
|
||||
|
||||
c.Relay = getConfirmation(false, "Use node as a relay? yes/[no]: ")
|
||||
if !c.Relay {
|
||||
p := getPath("Path to the storage directory (all available storage will be used): ")
|
||||
c.BlobstorPath = filepath.Join(p, "blob")
|
||||
c.MetabasePath = filepath.Join(p, "meta")
|
||||
}
|
||||
|
||||
out := applyTemplate(c)
|
||||
fatalOnErr(ioutil.WriteFile(outPath, out, 0644))
|
||||
|
||||
cmd.Println("Node is ready for work! Run `neofs-node -config " + outPath + "`")
|
||||
}
|
||||
|
||||
func getWalletAccount(w *wallet.Wallet, prompt string) string {
|
||||
addrs := make([]readline.PrefixCompleterInterface, len(w.Accounts))
|
||||
for i := range w.Accounts {
|
||||
addrs[i] = readline.PcItem(w.Accounts[i].Address)
|
||||
}
|
||||
|
||||
readline.SetAutoComplete(readline.NewPrefixCompleter(addrs...))
|
||||
defer readline.SetAutoComplete(nil)
|
||||
|
||||
s, err := readline.Line(prompt)
|
||||
fatalOnErr(err)
|
||||
return strings.TrimSpace(s) // autocompleter can return a string with a trailing space
|
||||
}
|
||||
|
||||
func getString(prompt string) string {
|
||||
s, err := readline.Line(prompt)
|
||||
fatalOnErr(err)
|
||||
if s != "" {
|
||||
_ = readline.AddHistory(s)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type filenameCompleter struct{}
|
||||
|
||||
func (filenameCompleter) Do(line []rune, pos int) (newLine [][]rune, length int) {
|
||||
prefix := string(line[:pos])
|
||||
dir := filepath.Dir(prefix)
|
||||
de, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
for i := range de {
|
||||
name := filepath.Join(dir, de[i].Name())
|
||||
if strings.HasPrefix(name, prefix) {
|
||||
tail := []rune(strings.TrimPrefix(name, prefix))
|
||||
if de[i].IsDir() {
|
||||
tail = append(tail, filepath.Separator)
|
||||
}
|
||||
newLine = append(newLine, tail)
|
||||
}
|
||||
}
|
||||
if pos != 0 {
|
||||
return newLine, pos - len([]rune(dir))
|
||||
}
|
||||
return newLine, 0
|
||||
}
|
||||
|
||||
func getPath(prompt string) string {
|
||||
readline.SetAutoComplete(filenameCompleter{})
|
||||
defer readline.SetAutoComplete(nil)
|
||||
|
||||
p, err := readline.Line(prompt)
|
||||
fatalOnErr(err)
|
||||
|
||||
if p == "" {
|
||||
return p
|
||||
}
|
||||
|
||||
_ = readline.AddHistory(p)
|
||||
|
||||
abs, err := filepath.Abs(p)
|
||||
if err != nil {
|
||||
fatalOnErr(fmt.Errorf("can't create an absolute path: %w", err))
|
||||
}
|
||||
|
||||
return abs
|
||||
}
|
||||
|
||||
func getConfirmation(def bool, prompt string) bool {
|
||||
for {
|
||||
s, err := readline.Line(prompt)
|
||||
fatalOnErr(err)
|
||||
|
||||
switch strings.ToLower(s) {
|
||||
case "y", "yes":
|
||||
return true
|
||||
case "n", "no":
|
||||
return false
|
||||
default:
|
||||
if len(s) == 0 {
|
||||
return def
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func applyTemplate(c config) []byte {
|
||||
tmpl, err := template.New("config").Parse(configTemplate)
|
||||
fatalOnErr(err)
|
||||
|
||||
b := bytes.NewBuffer(nil)
|
||||
fatalOnErr(tmpl.Execute(b, c))
|
||||
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
func fatalOnErr(err error) {
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func depositGas(cmd *cobra.Command, acc *wallet.Account, network string) {
|
||||
sideClient := initClient(n3config[network].MorphRPC)
|
||||
balanceHash, _ := util.Uint160DecodeStringLE(n3config[network].BalanceContract)
|
||||
|
||||
res, err := sideClient.InvokeFunction(balanceHash, "balanceOf", []smartcontract.Parameter{{
|
||||
Type: smartcontract.Hash160Type,
|
||||
Value: acc.Contract.ScriptHash(),
|
||||
}}, nil)
|
||||
fatalOnErr(err)
|
||||
|
||||
if res.State != vm.HaltState.String() {
|
||||
fatalOnErr(fmt.Errorf("invalid response from balance contract: %s", res.FaultException))
|
||||
}
|
||||
|
||||
var balance *big.Int
|
||||
if len(res.Stack) != 0 {
|
||||
balance, _ = res.Stack[0].TryInteger()
|
||||
}
|
||||
|
||||
if balance == nil {
|
||||
fatalOnErr(errors.New("invalid response from balance contract"))
|
||||
}
|
||||
|
||||
ok := getConfirmation(false, fmt.Sprintf("Current NeoFS balance is %s, make a deposit? y/[n]: ",
|
||||
fixedn.ToString(balance, 12)))
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
amountStr := getString("Enter amount in GAS: ")
|
||||
amount, err := fixedn.FromString(amountStr, 8)
|
||||
if err != nil {
|
||||
fatalOnErr(fmt.Errorf("invalid amount: %w", err))
|
||||
}
|
||||
|
||||
mainClient := initClient(n3config[network].RPC)
|
||||
neofsHash, _ := util.Uint160DecodeStringLE(n3config[network].NeoFSContract)
|
||||
|
||||
gasHash, err := mainClient.GetNativeContractHash(nativenames.Gas)
|
||||
fatalOnErr(err)
|
||||
|
||||
tx, err := mainClient.CreateNEP17TransferTx(acc, neofsHash, gasHash, amount.Int64(), 0, nil, nil)
|
||||
fatalOnErr(err)
|
||||
|
||||
txHash, err := mainClient.SignAndPushTx(tx, acc, nil)
|
||||
fatalOnErr(err)
|
||||
|
||||
cmd.Print("Waiting for transactions to persist.")
|
||||
tick := time.NewTicker(time.Second / 2)
|
||||
defer tick.Stop()
|
||||
|
||||
timer := time.NewTimer(time.Second * 20)
|
||||
defer timer.Stop()
|
||||
|
||||
at := trigger.Application
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
_, err := mainClient.GetApplicationLog(txHash, &at)
|
||||
if err == nil {
|
||||
cmd.Print("\n")
|
||||
break loop
|
||||
}
|
||||
cmd.Print(".")
|
||||
case <-timer.C:
|
||||
cmd.Printf("\nTimeout while waiting for transaction to persist.\n")
|
||||
if getConfirmation(false, "Continue configuration? yes/[no]: ") {
|
||||
return
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initClient(rpc []string) *client.Client {
|
||||
var c *client.Client
|
||||
var err error
|
||||
|
||||
shuffled := make([]string, len(rpc))
|
||||
copy(shuffled, rpc)
|
||||
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||||
|
||||
for _, endpoint := range shuffled {
|
||||
c, err = client.New(context.Background(), "https://"+endpoint, client.Options{
|
||||
DialTimeout: time.Second * 2,
|
||||
RequestTimeout: time.Second * 5,
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if err = c.Init(); err != nil {
|
||||
continue
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
fatalOnErr(fmt.Errorf("can't create N3 client: %w", err))
|
||||
panic("unreachable")
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/flynn-archive/go-shlex"
|
||||
|
@ -29,8 +30,11 @@ Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', '
|
|||
|
||||
Filter consists of <typ>:<key><match><value>
|
||||
Typ is 'obj' for object applied filter or 'req' for request applied filter.
|
||||
Key is a valid unicode string corresponding to object or request header key.
|
||||
Match is '==' for matching and '!=' for non-matching filter.
|
||||
Key is a valid unicode string corresponding to object or request header key.
|
||||
Well-known system object headers start with '$Object:' prefix.
|
||||
User defined headers start without prefix.
|
||||
Read more about filter keys at github.com/nspcc-dev/neofs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
|
||||
Match is '=' for matching and '!=' for non-matching filter.
|
||||
Value is a valid unicode string corresponding to object or request header value.
|
||||
|
||||
Target is
|
||||
|
@ -64,20 +68,20 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
|||
|
||||
containerID := cid.New()
|
||||
if err := containerID.Parse(cidArg); err != nil {
|
||||
cmd.Printf("invalid container ID: %v", err)
|
||||
return
|
||||
cmd.PrintErrf("invalid container ID: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
rulesFile, err := getRulesFromFile(fileArg)
|
||||
if err != nil {
|
||||
cmd.Printf("can't read rules from file : %v", err)
|
||||
return
|
||||
cmd.PrintErrf("can't read rules from file: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
rules = append(rules, rulesFile...)
|
||||
if len(rules) == 0 {
|
||||
cmd.Println("no extended ACL rules has been provided")
|
||||
return
|
||||
cmd.PrintErrln("no extended ACL rules has been provided")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
tb := eacl.NewTable()
|
||||
|
@ -85,14 +89,14 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
|||
for _, ruleStr := range rules {
|
||||
r, err := shlex.Split(ruleStr)
|
||||
if err != nil {
|
||||
cmd.Printf("can't parse rule '%s': %v)", ruleStr, err)
|
||||
return
|
||||
cmd.PrintErrf("can't parse rule '%s': %v\n", ruleStr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = parseTable(tb, r)
|
||||
if err != nil {
|
||||
cmd.Printf("can't create extended ACL record from rule '%s': %v", ruleStr, err)
|
||||
return
|
||||
cmd.PrintErrf("can't create extended ACL record from rule '%s': %v\n", ruleStr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -100,15 +104,15 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
|||
|
||||
data, err := tb.MarshalJSON()
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
cmd.PrintErrln(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
err = json.Indent(buf, data, "", " ")
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
cmd.PrintErrln(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if len(outArg) == 0 {
|
||||
|
@ -118,7 +122,8 @@ func createEACL(cmd *cobra.Command, _ []string) {
|
|||
|
||||
err = ioutil.WriteFile(outArg, buf.Bytes(), 0644)
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
cmd.PrintErrln(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -215,9 +215,11 @@ func getKeyNoGenerate() (*ecdsa.PrivateKey, error) {
|
|||
}
|
||||
|
||||
func getPassword() (string, error) {
|
||||
if pass := viper.GetString(password); pass != "" {
|
||||
return pass, nil
|
||||
// this check allows empty passwords
|
||||
if viper.IsSet(password) {
|
||||
return viper.GetString(password), nil
|
||||
}
|
||||
|
||||
return input.ReadPassword("Enter password > ")
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
@ -25,8 +24,6 @@ type valueWithTime struct {
|
|||
|
||||
// entity that provides TTL cache interface.
|
||||
type ttlNetCache struct {
|
||||
mtx sync.Mutex
|
||||
|
||||
ttl time.Duration
|
||||
|
||||
sz int
|
||||
|
@ -55,9 +52,6 @@ func newNetworkTTLCache(sz int, ttl time.Duration, netRdr netValueReader) *ttlNe
|
|||
//
|
||||
// returned value should not be modified.
|
||||
func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
val, ok := c.cache.Peek(key)
|
||||
if ok {
|
||||
valWithTime := val.(*valueWithTime)
|
||||
|
@ -83,29 +77,21 @@ func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
|
|||
}
|
||||
|
||||
func (c *ttlNetCache) remove(key interface{}) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.cache.Remove(key)
|
||||
}
|
||||
|
||||
func (c *ttlNetCache) keys() []interface{} {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
return c.cache.Keys()
|
||||
}
|
||||
|
||||
// entity that provides LRU cache interface.
|
||||
type lruNetCache struct {
|
||||
mtx sync.Mutex
|
||||
|
||||
cache *lru.Cache
|
||||
|
||||
netRdr netValueReader
|
||||
}
|
||||
|
||||
// complicates netValueReader with LRU caching mechanism.
|
||||
// newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
|
||||
func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
||||
cache, err := lru.New(sz)
|
||||
fatalOnErr(err)
|
||||
|
@ -122,9 +108,6 @@ func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
|
|||
//
|
||||
// returned value should not be modified.
|
||||
func (c *lruNetCache) get(key interface{}) (interface{}, error) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
val, ok := c.cache.Get(key)
|
||||
if ok {
|
||||
return val, nil
|
||||
|
@ -336,3 +319,39 @@ func (s *ttlContainerLister) InvalidateContainerListByCID(id *cid.ID) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
type cachedIRFetcher ttlNetCache
|
||||
|
||||
func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) *cachedIRFetcher {
|
||||
const (
|
||||
irFetcherCacheSize = 1 // we intend to store only one value
|
||||
|
||||
// Without the cache in the testnet we can see several hundred simultaneous
|
||||
// requests (neofs-node #1278), so limiting the request rate solves the issue.
|
||||
//
|
||||
// Exact request rate doesn't really matter because Inner Ring list update
|
||||
// happens extremely rare, but there is no side chain events for that as
|
||||
// for now (neofs-contract v0.15.0 notary disabled env) to monitor it.
|
||||
irFetcherCacheTTL = 30 * time.Second
|
||||
)
|
||||
|
||||
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
|
||||
func(key interface{}) (interface{}, error) {
|
||||
return f.InnerRingKeys()
|
||||
},
|
||||
)
|
||||
|
||||
return (*cachedIRFetcher)(irFetcherCache)
|
||||
}
|
||||
|
||||
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
|
||||
// the cache or expired, then it returns keys from side chain and updates
|
||||
// the cache.
|
||||
func (f *cachedIRFetcher) InnerRingKeys() ([][]byte, error) {
|
||||
val, err := (*ttlNetCache)(f).get("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return val.([][]byte), nil
|
||||
}
|
||||
|
|
|
@ -116,6 +116,8 @@ type cfg struct {
|
|||
clientCache *cache.ClientCache
|
||||
|
||||
persistate *state.PersistentStorage
|
||||
|
||||
netMapSource netmapCore.Source
|
||||
}
|
||||
|
||||
type cfgGRPC struct {
|
||||
|
@ -175,8 +177,6 @@ type cfgNodeInfo struct {
|
|||
}
|
||||
|
||||
type cfgObject struct {
|
||||
netMapSource netmapCore.Source
|
||||
|
||||
cnrSource container.Source
|
||||
|
||||
eaclSource eacl.Source
|
||||
|
|
|
@ -105,7 +105,7 @@ func initContainerService(c *cfg) {
|
|||
|
||||
loadPlacementBuilder := &loadPlacementBuilder{
|
||||
log: c.log,
|
||||
nmSrc: c.cfgNetmap.wrapper,
|
||||
nmSrc: c.netMapSource,
|
||||
cnrSrc: cnrSrc,
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithKey(&c.key.PrivateKey),
|
||||
controlSvc.WithAuthorizedKeys(rawPubs),
|
||||
controlSvc.WithHealthChecker(c),
|
||||
controlSvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
||||
controlSvc.WithNetMapSource(c.netMapSource),
|
||||
controlSvc.WithNodeState(c),
|
||||
controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error {
|
||||
prm := new(engine.DeletePrm).WithAddresses(addrList...)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -95,7 +96,19 @@ func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {
|
|||
|
||||
l.Info("stopping gRPC server...")
|
||||
|
||||
s.GracefulStop()
|
||||
// GracefulStop() may freeze forever, see #1270
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
s.GracefulStop()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Minute):
|
||||
l.Info("gRPC cannot shutdown gracefully, forcing stop")
|
||||
s.Stop()
|
||||
}
|
||||
|
||||
l.Info("gRPC server stopped successfully")
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ func initMorphComponents(c *cfg) {
|
|||
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
||||
}
|
||||
|
||||
c.cfgObject.netMapSource = netmapSource
|
||||
c.netMapSource = netmapSource
|
||||
c.cfgNetmap.wrapper = wrap
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
clientConstructor := &reputationClientConstructor{
|
||||
log: c.log,
|
||||
nmSrc: c.cfgObject.netMapSource,
|
||||
nmSrc: c.netMapSource,
|
||||
netState: c.cfgNetmap.state,
|
||||
trustStorage: c.cfgReputation.localTrustStorage,
|
||||
basicConstructor: c.clientCache,
|
||||
|
@ -228,7 +228,7 @@ func initObjectService(c *cfg) {
|
|||
policer.WithLocalStorage(ls),
|
||||
policer.WithContainerSource(c.cfgObject.cnrSource),
|
||||
policer.WithPlacementBuilder(
|
||||
placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapSource),
|
||||
placement.NewNetworkMapSourceBuilder(c.netMapSource),
|
||||
),
|
||||
policer.WithRemoteHeader(
|
||||
headsvc.NewRemoteHeader(keyStorage, clientConstructor),
|
||||
|
@ -251,7 +251,7 @@ func initObjectService(c *cfg) {
|
|||
policer.WithNodeLoader(c),
|
||||
)
|
||||
|
||||
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapSource, c.cfgObject.cnrSource, c)
|
||||
traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
|
||||
|
||||
c.workers = append(c.workers, pol)
|
||||
|
||||
|
@ -261,7 +261,7 @@ func initObjectService(c *cfg) {
|
|||
putsvc.WithMaxSizeSource(c),
|
||||
putsvc.WithLocalStorage(ls),
|
||||
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
||||
putsvc.WithNetworkMapSource(c.cfgObject.netMapSource),
|
||||
putsvc.WithNetworkMapSource(c.netMapSource),
|
||||
putsvc.WithNetmapKeys(c),
|
||||
putsvc.WithFormatValidatorOpts(
|
||||
objectCore.WithDeleteHandler(objInhumer),
|
||||
|
@ -285,7 +285,7 @@ func initObjectService(c *cfg) {
|
|||
placement.WithoutSuccessTracking(),
|
||||
),
|
||||
),
|
||||
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
||||
searchsvc.WithNetMapSource(c.netMapSource),
|
||||
searchsvc.WithKeyStorage(keyStorage),
|
||||
)
|
||||
|
||||
|
@ -303,7 +303,7 @@ func initObjectService(c *cfg) {
|
|||
placement.SuccessAfter(1),
|
||||
),
|
||||
),
|
||||
getsvc.WithNetMapSource(c.cfgNetmap.wrapper),
|
||||
getsvc.WithNetMapSource(c.netMapSource),
|
||||
getsvc.WithKeyStorage(keyStorage),
|
||||
)
|
||||
|
||||
|
@ -348,8 +348,8 @@ func initObjectService(c *cfg) {
|
|||
acl.WithSenderClassifier(
|
||||
acl.NewSenderClassifier(
|
||||
c.log,
|
||||
irFetcher,
|
||||
c.cfgNetmap.wrapper,
|
||||
newCachedIRFetcher(irFetcher),
|
||||
c.netMapSource,
|
||||
),
|
||||
),
|
||||
acl.WithContainerSource(
|
||||
|
|
|
@ -37,8 +37,7 @@ func initReputationService(c *cfg) {
|
|||
|
||||
localKey := c.key.PublicKey().Bytes()
|
||||
|
||||
// consider sharing this between application components
|
||||
nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper)
|
||||
nmSrc := c.netMapSource
|
||||
|
||||
// storing calculated trusts as a daughter
|
||||
c.cfgReputation.localTrustStorage = truststorage.New(
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
version: "2.4"
|
||||
services:
|
||||
storage01:
|
||||
image: nspccdev/neofs-storage-testnet:0.27.5
|
||||
image: nspccdev/neofs-storage-testnet:0.27.7
|
||||
container_name: neofs-testnet
|
||||
env_file: node_config.env
|
||||
network_mode: host
|
||||
|
|
1
go.mod
1
go.mod
|
@ -3,6 +3,7 @@ module github.com/nspcc-dev/neofs-node
|
|||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/google/go-github/v39 v39.2.0
|
||||
github.com/google/uuid v1.2.0
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -238,6 +238,7 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR
|
|||
|
||||
cliPrm.WithAddress(prm.objAddr)
|
||||
cliPrm.WithRangeList(prm.rng)
|
||||
cliPrm.TZ()
|
||||
|
||||
cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm,
|
||||
client.WithKey(x.key),
|
||||
|
|
|
@ -325,12 +325,16 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
|||
|
||||
res, err = b.getRangeFromLevel(prm, p, !ok)
|
||||
if err != nil {
|
||||
if !errors.Is(err, object.ErrNotFound) {
|
||||
outOfBounds := errors.Is(err, object.ErrRangeOutOfBounds)
|
||||
if !errors.Is(err, object.ErrNotFound) && !outOfBounds {
|
||||
b.log.Debug("could not get object from level",
|
||||
zap.String("level", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
if outOfBounds {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
@ -592,7 +596,7 @@ func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRang
|
|||
// we don't use GetRange call for now since blobovnicza
|
||||
// stores data that is compressed on BlobStor side.
|
||||
// If blobovnicza learns to do the compression itself,
|
||||
// wecan start using GetRange.
|
||||
// we can start using GetRange.
|
||||
res, err := blz.Get(gPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
)
|
||||
|
||||
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
||||
|
@ -27,6 +29,10 @@ func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
|
|||
// get compressed object data
|
||||
data, err := b.fsTree.Get(prm.addr)
|
||||
if err != nil {
|
||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||
return nil, object.ErrNotFound
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("could not read object from fs tree: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,10 @@ package engine
|
|||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -48,3 +50,38 @@ func TestExecBlocks(t *testing.T) {
|
|||
// try to resume
|
||||
require.Error(t, e.ResumeExecution())
|
||||
}
|
||||
|
||||
func TestPersistentShardID(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
checkShardState(t, e, id[0], shard.ModeReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, id, newID)
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].DumpInfo().MetaBaseInfo.Path
|
||||
p2 := e.shards[id[1].String()].DumpInfo().MetaBaseInfo.Path
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
|
||||
require.Equal(t, id[1], newID[0])
|
||||
require.Equal(t, id[0], newID[1])
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, mode shard.Mode) {
|
||||
e.mtx.RLock()
|
||||
sh := e.shards[id.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
}
|
||||
|
|
110
pkg/local_object_storage/engine/error_test.go
Normal file
110
pkg/local_object_storage/engine/error_test.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
const errSmallSize = 256
|
||||
|
||||
func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||
if dir == "" {
|
||||
var err error
|
||||
|
||||
dir, err = os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
}
|
||||
|
||||
e := New(
|
||||
WithLogger(zaptest.NewLogger(t)),
|
||||
WithShardPoolSize(1))
|
||||
|
||||
var ids [2]*shard.ID
|
||||
var err error
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
shard.WithLogger(zaptest.NewLogger(t)),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
blobstor.WithShallowDepth(1),
|
||||
blobstor.WithBlobovniczaShallowWidth(1),
|
||||
blobstor.WithBlobovniczaShallowDepth(1),
|
||||
blobstor.WithSmallSizeLimit(errSmallSize),
|
||||
blobstor.WithRootPerm(0700)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
return e, dir, ids
|
||||
}
|
||||
|
||||
// Issue #1186.
|
||||
func TestBlobstorFailback(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
objs := make([]*object.Object, 0, 2)
|
||||
for _, size := range []int{15, errSmallSize + 1} {
|
||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, size))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj.Object())
|
||||
e.mtx.RLock()
|
||||
_, err = e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
objs = append(objs, obj.Object())
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
_, err = e.GetRange(&RngPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].DumpInfo().BlobStorInfo.RootPath
|
||||
p2 := e.shards[id[1].String()].DumpInfo().BlobStorInfo.RootPath
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
for i := range objs {
|
||||
getRes, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], getRes.Object())
|
||||
|
||||
rngRes, err := e.GetRange(&RngPrm{addr: objs[i].Address(), off: 1, ln: 10})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||
|
||||
_, err = e.GetRange(&RngPrm{addr: objs[i].Address(), off: errSmallSize + 10, ln: 1})
|
||||
require.True(t, errors.Is(err, object.ErrRangeOutOfBounds), "got: %v", err)
|
||||
}
|
||||
}
|
|
@ -64,6 +64,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
|
||||
outSI *objectSDK.SplitInfo
|
||||
outError = object.ErrNotFound
|
||||
|
||||
shardWithMeta hashedShard
|
||||
)
|
||||
|
||||
shPrm := new(shard.GetPrm).
|
||||
|
@ -72,6 +74,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
if err != nil {
|
||||
if res.HasMeta() {
|
||||
shardWithMeta = hashedShard{sh: sh}
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, object.ErrNotFound):
|
||||
return false // ignore, go to next shard
|
||||
|
@ -116,7 +121,23 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
}
|
||||
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||
return nil, outError
|
||||
}
|
||||
|
||||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
obj = res.Object()
|
||||
return err == nil
|
||||
})
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
}
|
||||
}
|
||||
|
||||
return &GetRes{
|
||||
|
|
|
@ -82,6 +82,8 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
|
||||
outSI *objectSDK.SplitInfo
|
||||
outError = object.ErrNotFound
|
||||
|
||||
shardWithMeta hashedShard
|
||||
)
|
||||
|
||||
shPrm := new(shard.RngPrm).
|
||||
|
@ -91,6 +93,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
if err != nil {
|
||||
if res.HasMeta() {
|
||||
shardWithMeta = hashedShard{sh: sh}
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, object.ErrNotFound):
|
||||
return false // ignore, go to next shard
|
||||
|
@ -137,7 +142,27 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
}
|
||||
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||
return nil, outError
|
||||
}
|
||||
|
||||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
if errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||
outError = object.ErrRangeOutOfBounds
|
||||
return true
|
||||
}
|
||||
obj = res.Object()
|
||||
return err == nil
|
||||
})
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
}
|
||||
}
|
||||
|
||||
return &RngRes{
|
||||
|
|
|
@ -25,26 +25,34 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
|
|||
e.mtx.Lock()
|
||||
defer e.mtx.Unlock()
|
||||
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
}
|
||||
|
||||
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
strID := id.String()
|
||||
id, err := generateShardID()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
}
|
||||
|
||||
e.shards[strID] = shard.New(append(opts,
|
||||
sh := shard.New(append(opts,
|
||||
shard.WithID(id),
|
||||
shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
return nil, fmt.Errorf("could not open shard: %w", err)
|
||||
}
|
||||
|
||||
strID := sh.ID().String()
|
||||
if _, ok := e.shards[strID]; ok {
|
||||
return nil, fmt.Errorf("shard with id %s was already added", strID)
|
||||
}
|
||||
|
||||
e.shards[strID] = sh
|
||||
e.shardPools[strID] = pool
|
||||
|
||||
return id, nil
|
||||
return sh.ID(), nil
|
||||
}
|
||||
|
||||
func generateShardID() (*shard.ID, error) {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
@ -47,9 +48,17 @@ type referenceCounter map[string]*referenceNumber
|
|||
|
||||
// Delete removed object records from metabase indexes.
|
||||
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
||||
return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return db.deleteGroup(tx, prm.addrs)
|
||||
})
|
||||
if err == nil {
|
||||
for i := range prm.addrs {
|
||||
storagelog.Write(db.log,
|
||||
storagelog.AddressField(prm.addrs[i]),
|
||||
storagelog.OpField("metabase DELETE"))
|
||||
}
|
||||
}
|
||||
return new(DeleteRes), err
|
||||
}
|
||||
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -68,6 +69,11 @@ func (db *DB) Put(prm *PutPrm) (res *PutRes, err error) {
|
|||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return db.put(tx, prm.obj, prm.id, nil)
|
||||
})
|
||||
if err == nil {
|
||||
storagelog.Write(db.log,
|
||||
storagelog.AddressField(prm.obj.Address()),
|
||||
storagelog.OpField("metabase PUT"))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
36
pkg/local_object_storage/metabase/shard_id.go
Normal file
36
pkg/local_object_storage/metabase/shard_id.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
shardInfoBucket = []byte(invalidBase58String + "i")
|
||||
shardIDKey = []byte("id")
|
||||
)
|
||||
|
||||
// ReadShardID reads shard id from db.
|
||||
// If id is missing, returns nil, nil.
|
||||
func (db *DB) ReadShardID() ([]byte, error) {
|
||||
var id []byte
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
id = slice.Copy(b.Get(shardIDKey))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return id, err
|
||||
}
|
||||
|
||||
// WriteShardID writes shard it to db.
|
||||
func (db *DB) WriteShardID(id []byte) error {
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put(shardIDKey, id)
|
||||
})
|
||||
}
|
|
@ -26,7 +26,6 @@ func (s *Shard) Open() error {
|
|||
return fmt.Errorf("could not open %T: %w", component, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob
|
|||
|
||||
// GetPrm groups the parameters of Get operation.
|
||||
type GetPrm struct {
|
||||
addr *objectSDK.Address
|
||||
addr *objectSDK.Address
|
||||
skipMeta bool
|
||||
}
|
||||
|
||||
// GetRes groups resulting values of Get operation.
|
||||
type GetRes struct {
|
||||
obj *object.Object
|
||||
obj *object.Object
|
||||
hasMeta bool
|
||||
}
|
||||
|
||||
// WithAddress is a Get option to set the address of the requested object.
|
||||
|
@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
func (r *GetRes) Object() *object.Object {
|
||||
return r.obj
|
||||
}
|
||||
|
||||
// HasMeta returns true if info about the object was found in the metabase.
|
||||
func (r *GetRes) HasMeta() bool {
|
||||
return r.hasMeta
|
||||
}
|
||||
|
||||
// Get reads an object from shard.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
|
|||
return res.Object(), nil
|
||||
}
|
||||
|
||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &GetRes{
|
||||
obj: obj,
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
}
|
||||
|
||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||
func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) (*object.Object, error) {
|
||||
func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) {
|
||||
var (
|
||||
err error
|
||||
res *object.Object
|
||||
|
@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
if s.hasWriteCache() {
|
||||
res, err = s.writeCache.Get(addr)
|
||||
if err == nil {
|
||||
return res, nil
|
||||
return res, false, nil
|
||||
}
|
||||
|
||||
if errors.Is(err, object.ErrNotFound) {
|
||||
|
@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
}
|
||||
}
|
||||
|
||||
if skipMeta {
|
||||
res, err = small(s.blobStor, nil)
|
||||
if err == nil || errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||
return res, false, err
|
||||
}
|
||||
res, err = big(s.blobStor, nil)
|
||||
return res, false, err
|
||||
}
|
||||
|
||||
exists, err := meta.Exists(s.metaBase, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return nil, object.ErrNotFound
|
||||
return nil, false, object.ErrNotFound
|
||||
}
|
||||
|
||||
blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
}
|
||||
|
||||
if blobovniczaID != nil {
|
||||
|
@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
res, err = big(s.blobStor, nil)
|
||||
}
|
||||
|
||||
return res, err
|
||||
return res, true, err
|
||||
}
|
||||
|
|
|
@ -23,3 +23,25 @@ func (id ID) String() string {
|
|||
func (s *Shard) ID() *ID {
|
||||
return s.info.ID
|
||||
}
|
||||
|
||||
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
|
||||
func (s *Shard) UpdateID() (err error) {
|
||||
if err = s.metaBase.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
cErr := s.metaBase.Close()
|
||||
if err == nil {
|
||||
err = cErr
|
||||
}
|
||||
}()
|
||||
id, err := s.metaBase.ReadShardID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(id) != 0 {
|
||||
s.info.ID = NewIDFromBytes(id)
|
||||
return nil
|
||||
}
|
||||
return s.metaBase.WriteShardID(*s.info.ID)
|
||||
}
|
||||
|
|
|
@ -14,11 +14,14 @@ type RngPrm struct {
|
|||
off uint64
|
||||
|
||||
addr *objectSDK.Address
|
||||
|
||||
skipMeta bool
|
||||
}
|
||||
|
||||
// RngRes groups resulting values of GetRange operation.
|
||||
type RngRes struct {
|
||||
obj *object.Object
|
||||
obj *object.Object
|
||||
hasMeta bool
|
||||
}
|
||||
|
||||
// WithAddress is a Rng option to set the address of the requested object.
|
||||
|
@ -41,6 +44,13 @@ func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object part.
|
||||
//
|
||||
// Instance payload contains the requested range of the original object.
|
||||
|
@ -48,6 +58,11 @@ func (r *RngRes) Object() *object.Object {
|
|||
return r.obj
|
||||
}
|
||||
|
||||
// HasMeta returns true if info about the object was found in the metabase.
|
||||
func (r *RngRes) HasMeta() bool {
|
||||
return r.hasMeta
|
||||
}
|
||||
|
||||
// GetRange reads part of an object from shard.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -94,9 +109,10 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
|||
return obj.Object(), nil
|
||||
}
|
||||
|
||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &RngRes{
|
||||
obj: obj,
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ func New(opts ...Option) *Shard {
|
|||
return s
|
||||
}
|
||||
|
||||
// WithID returns option to set shard identifier.
|
||||
// WithID returns option to set the default shard identifier.
|
||||
func WithID(id *ID) Option {
|
||||
return func(c *cfg) {
|
||||
c.info.ID = id
|
||||
|
|
|
@ -52,7 +52,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
|||
|
||||
obj, err := engine.Get(p.localStorage, task.addr)
|
||||
if err != nil {
|
||||
p.log.Error("could not get object from local storage")
|
||||
p.log.Error("could not get object from local storage",
|
||||
zap.Stringer("object", task.addr),
|
||||
zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue