Compare commits

...

23 commits

Author SHA1 Message Date
Alex Vanin
1fc9351f4f Release v0.27.7
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-30 15:23:35 +03:00
Alex Vanin
1a2b8ab59d [#1278] neofs-node: Cache IRFetcher
Signed-off-by: Alex Vanin <alexey@nspcc.ru>

(cherry picked from commit e4a8ed589b)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-30 14:35:31 +03:00
Alex Vanin
5180e467f8 [#1278] neofs-node: Use global cached netmap source in services
Signed-off-by: Alex Vanin <alexey@nspcc.ru>

(cherry picked from commit be6ae3c066)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-30 14:34:34 +03:00
Evgenii Stratonikov
a193db3a3d [#1204] shard: Save ID in the metabase
`AddShard` must return shard id, so we temporarily open metabase
there.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
2022-03-30 11:08:24 +03:00
Evgenii Stratonikov
49ae91d720 Release v0.27.6
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-28 14:26:08 +03:00
Evgenii Stratonikov
b087d7ead3 [#1261] neofs-cli: Fix help message for acl extended create
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit ae8e38cace)
2022-03-28 13:55:17 +03:00
Alex Vanin
e4f357561e [#1259] neofs-cli: Use more cmd.PrintErr*()
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
(cherry picked from commit c8b585b991)
2022-03-28 13:55:13 +03:00
Alex Vanin
437687f78d [#1259] neofs-cli: Return non-zero exit code in acl extended create command failures
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
(cherry picked from commit 44138adacf)
2022-03-28 13:55:11 +03:00
Alex Vanin
1cfa1763e9 [#1200] cli: Mention filter key prefixes in eACL creation command
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
(cherry picked from commit 09db5e387d)
2022-03-28 13:54:55 +03:00
Alex Vanin
3de3c102fc [#1270] neofs-node: Add timeout for grpc GracefulStop()
GracefulStop() may be blocked until all server-side streams
are finished. There is no control over such streams yet, so
application may be frozen in shutdown stage.

Naive solution is to add timeout for GracefulStop(). At this
point healthy connection will be finished and unhealthy
connections will be terminated by Stop().

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
(cherry picked from commit 683439970a)
2022-03-28 13:54:20 +03:00
Evgenii Stratonikov
679df13924 [#1236] neofs-node: Neofs-node: Remove mutex from ttlNetCache
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit f0ec35478a)
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
73d367e287 [#1236] neofs-node: Remove mutex from lruNetCache
We already use thread-safe LRU and mutex shouldn't be taken while making
network requests.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 22b1208a20)
2022-03-28 13:48:44 +03:00
Pavel Karpy
5f54fd5dc8 [#1223] amd: Fix subnet node command
Added `-w` flag.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
(cherry picked from commit 697c12a5e9)
2022-03-28 13:48:44 +03:00
Alex Vanin
fe3be92c89 [#1230] replicator: Improve error log
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
(cherry picked from commit e38d7dda6b)
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
2977552a19 [#1186] engine: Allow to skip metabase in GetRange
Similarly to `Get`. Also fix a bug where `ErrNotFound` is returned
instead of `ErrRangeOutOfBounds`.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>

(cherry picked from commit 1fe9cd4d36)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
48b5d2cb91 [#1186] blobstor: Fix comment
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 2b5550ccf6)
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
baad9d06a1 [#1186] blobstor: Unify errors for Get and GetRangeBig
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit b33fb0f739)
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
b5bcf90fa1 [#1186] engine: Read object directly from blobstor in case of conflicts
Metabase is expected to contain actual information about objects stored
in shard. If the object is present in metabase but is missing from
blobstor, peform an additional attempt to fetch it directly without
consulting metabase. Such a situation is unexpected, so error counter
is increased for the shard which has the object in the metabase. We
don't increase error counter for the shard which has the object in
blobstor, because some garbage can be expected there. In this
implementation there is no overhead for objects which are really
missing, i.e. are not present in any metabase.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>

(cherry picked from commit 69e1e6ca20)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2022-03-28 13:48:44 +03:00
Evgenii Stratonikov
ce169491ed [#1188] metabase: log PUT / DELETE operations
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 6e6f3648d2)
2022-03-28 13:48:44 +03:00
ZhangTao1596
58f2354057 [#1180] ir/internal: Fix audit range hash type
Signed-off-by: ZhangTao1596 <zhangtao@ngd.neo.org>
(cherry picked from commit dd0e10d306)
2022-03-28 13:03:12 +03:00
Evgenii Stratonikov
25b827e0fd [#1090] neofs-adm: Add history of commands to a storage configurator
It also persists between sessions.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit b3b3b8b20f)
2022-03-28 13:01:45 +03:00
Evgenii Stratonikov
00180a7ecf [#1090] neofs-adm: add interactive configurator for storage node
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 8263582dde)
2022-03-28 13:01:45 +03:00
Pavel Karpy
1f825a467a [#1136] cli: Allow usage of empty wallet passwords
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
(cherry picked from commit c7a8c762e0)
2022-03-28 13:01:33 +03:00
35 changed files with 1040 additions and 85 deletions

View file

@ -3,6 +3,30 @@ Changelog for NeoFS Node
## [Unreleased] ## [Unreleased]
## [0.27.7] - 2022-03-30
### Fixed
- Shard ID is now consistent between restarts (#1204)
### Added
- More N3 RPC caches in object service (#1278)
## [0.27.6] - 2022-03-28
### Fixed
- Allow empty passwords in neofs-cli config (#1136)
- Set correct audit range hash type in neofs-ir (#1180)
- Read objects directly from blobstor in case of shard inconsistency (#1186)
- Fix `-w` flag in subnet commands of neofs-adm (#1223)
- Do not use explicit mutex lock in chain caches (#1236)
- Force gRPC server stop if it can't shut down gracefully in storage node (#1270)
- Return non-zero exit code in `acl extended create` command failures and fix
help message (#1259)
### Added
- Interactive storage node configurator in neofs-adm (#1090)
- Logs in metabase operations (#1188)
## [0.27.5] - 2022-01-31 ## [0.27.5] - 2022-01-31
### Fixed ### Fixed
@ -916,7 +940,9 @@ NeoFS-API v2.0 support and updated brand-new storage node application.
First public review release. First public review release.
[Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...master [Unreleased]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.7...master
[0.27.7]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.6...v0.27.7
[0.27.6]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.5...v0.27.6
[0.27.5]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.4...v0.27.5 [0.27.5]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.4...v0.27.5
[0.27.4]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.3...v0.27.4 [0.27.4]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.3...v0.27.4
[0.27.3]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.2...v0.27.3 [0.27.3]: https://github.com/nspcc-dev/neofs-node/compare/v0.27.2...v0.27.3

View file

@ -770,6 +770,7 @@ var cmdSubnetNode = &cobra.Command{
Short: "Manage nodes of the NeoFS subnet.", Short: "Manage nodes of the NeoFS subnet.",
PreRun: func(cmd *cobra.Command, _ []string) { PreRun: func(cmd *cobra.Command, _ []string) {
viperBindFlags(cmd, viperBindFlags(cmd,
flagSubnetWallet,
flagSubnetNode, flagSubnetNode,
flagSubnetNodeSubnet, flagSubnetNodeSubnet,
) )
@ -882,8 +883,10 @@ func init() {
// subnet node flags // subnet node flags
nodeFlags := cmdSubnetNode.PersistentFlags() nodeFlags := cmdSubnetNode.PersistentFlags()
nodeFlags.StringP(flagSubnetWallet, "w", "", "Path to file with wallet")
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetWallet)
nodeFlags.String(flagSubnetNode, "", "Hex-encoded public key of the node") nodeFlags.String(flagSubnetNode, "", "Hex-encoded public key of the node")
_ = cmdSubnetAdmin.MarkFlagRequired(flagSubnetNode) _ = cmdSubnetNode.MarkFlagRequired(flagSubnetNode)
nodeFlags.String(flagSubnetNodeSubnet, "", "ID of the subnet to manage nodes") nodeFlags.String(flagSubnetNodeSubnet, "", "ID of the subnet to manage nodes")
_ = cmdSubnetNode.MarkFlagRequired(flagSubnetNodeSubnet) _ = cmdSubnetNode.MarkFlagRequired(flagSubnetNodeSubnet)

View file

@ -5,6 +5,7 @@ import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/config" "github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/config"
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/morph" "github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/morph"
"github.com/nspcc-dev/neofs-node/cmd/neofs-adm/internal/modules/storagecfg"
"github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/util/autocomplete" "github.com/nspcc-dev/neofs-node/pkg/util/autocomplete"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -34,6 +35,7 @@ func init() {
rootCmd.AddCommand(config.RootCmd) rootCmd.AddCommand(config.RootCmd)
rootCmd.AddCommand(morph.RootCmd) rootCmd.AddCommand(morph.RootCmd)
rootCmd.AddCommand(storagecfg.RootCmd)
rootCmd.AddCommand(autocomplete.Command("neofs-adm")) rootCmd.AddCommand(autocomplete.Command("neofs-adm"))
} }

View file

@ -0,0 +1,143 @@
package storagecfg
const configTemplate = `logger:
level: info # logger level: one of "debug", "info" (default), "warn", "error", "dpanic", "panic", "fatal"
node:
wallet:
path: {{ .Wallet.Path }} # path to a NEO wallet; ignored if key is presented
address: {{ .Wallet.Account }} # address of a NEO account in the wallet; ignored if key is presented
password: {{ .Wallet.Password }} # password for a NEO account in the wallet; ignored if key is presented
addresses: # list of addresses announced by Storage node in the Network map
- {{ .AnnouncedAddress }}
attribute_0: UN-LOCODE:{{ .Attribute.Locode }}
relay: {{ .Relay }} # start Storage node in relay mode without bootstrapping into the Network map
subnet:
exit_zero: false # toggle entrance to zero subnet (overrides corresponding attribute and occurrence in entries)
entries: [] # list of IDs of subnets to enter in a text format of NeoFS API protocol (overrides corresponding attributes)
grpc:
num: 1 # total number of listener endpoints
0:
endpoint: {{ .Endpoint }} # endpoint for gRPC server
tls:{{if .TLSCert}}
enabled: true # enable TLS for a gRPC connection (min version is TLS 1.2)
certificate: {{ .TLSCert }} # path to TLS certificate
key: {{ .TLSKey }} # path to TLS key
{{- else }}
enabled: false # disable TLS for a gRPC connection
{{- end}}
control:
authorized_keys: # list of hex-encoded public keys that have rights to use the Control Service
{{- range .AuthorizedKeys }}
- {{.}}{{end}}
grpc:
endpoint: {{.ControlEndpoint}} # endpoint that is listened by the Control Service
morph:
dial_timeout: 20s # timeout for side chain NEO RPC client connection
disable_cache: false # use TTL cache for side chain GET operations
rpc_endpoint: # side chain N3 RPC endpoints
{{- range .MorphRPC }}
- https://{{.}}{{end}}
notification_endpoint: # side chain N3 RPC notification endpoints
{{- range .MorphRPC }}
- wss://{{.}}/ws{{end}}
{{if not .Relay }}
storage:
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_num: 1 # total number of shards
default: # section with the default shard parameters
metabase:
perm: 0644 # permissions for metabase files(directories: +x for current user and group)
blobstor:
perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
depth: 2 # max depth of object tree storage in FS
small_object_size: 102400 # 100KiB, size threshold for "small" objects which are stored in key-value DB, not in FS, bytes
compress: true # turn on/off Zstandard compression (level 3) of stored objects
compression_exclude_content_types:
- audio/*
- video/*
blobovnicza:
size: 1073741824 # approximate size limit of single blobovnicza instance, total size will be: size*width^(depth+1), bytes
depth: 1 # max depth of object tree storage in key-value DB
width: 4 # max width of object tree storage in key-value DB
opened_cache_capacity: 50 # maximum number of opened database files
gc:
remover_batch_size: 200 # number of objects to be removed by the garbage collector
remover_sleep_interval: 5m # frequency of the garbage collector invocation
shard:
0:
mode: "read-write" # mode of the shard, must be one of the: "read-write" (default), "read-only"
metabase:
path: {{ .MetabasePath }} # path to the metabase
blobstor:
path: {{ .BlobstorPath }} # path to the blobstor
{{end}}`
const (
neofsMainnetAddress = "2cafa46838e8b564468ebd868dcafdd99dce6221"
balanceMainnetAddress = "dc1ec98d9d0c5f9dfade16144defe08cffc5ca55"
neofsTestnetAddress = "b65d8243ac63983206d17e5221af0653a7266fa1"
balanceTestnetAddress = "e0420c216003747626670d1424569c17c79015bf"
)
var n3config = map[string]struct {
MorphRPC []string
RPC []string
NeoFSContract string
BalanceContract string
}{
"testnet": {
MorphRPC: []string{
"rpc01.morph.testnet.fs.neo.org:51331",
"rpc02.morph.testnet.fs.neo.org:51331",
"rpc03.morph.testnet.fs.neo.org:51331",
"rpc04.morph.testnet.fs.neo.org:51331",
"rpc05.morph.testnet.fs.neo.org:51331",
"rpc06.morph.testnet.fs.neo.org:51331",
"rpc07.morph.testnet.fs.neo.org:51331",
},
RPC: []string{
"rpc01.testnet.n3.nspcc.ru:21331",
"rpc02.testnet.n3.nspcc.ru:21331",
"rpc03.testnet.n3.nspcc.ru:21331",
"rpc04.testnet.n3.nspcc.ru:21331",
"rpc05.testnet.n3.nspcc.ru:21331",
"rpc06.testnet.n3.nspcc.ru:21331",
"rpc07.testnet.n3.nspcc.ru:21331",
},
NeoFSContract: neofsTestnetAddress,
BalanceContract: balanceTestnetAddress,
},
"mainnet": {
MorphRPC: []string{
"rpc1.morph.fs.neo.org:40341",
"rpc2.morph.fs.neo.org:40341",
"rpc3.morph.fs.neo.org:40341",
"rpc4.morph.fs.neo.org:40341",
"rpc5.morph.fs.neo.org:40341",
"rpc6.morph.fs.neo.org:40341",
"rpc7.morph.fs.neo.org:40341",
},
RPC: []string{
"rpc1.n3.nspcc.ru:10331",
"rpc2.n3.nspcc.ru:10331",
"rpc3.n3.nspcc.ru:10331",
"rpc4.n3.nspcc.ru:10331",
"rpc5.n3.nspcc.ru:10331",
"rpc6.n3.nspcc.ru:10331",
"rpc7.n3.nspcc.ru:10331",
},
NeoFSContract: neofsMainnetAddress,
BalanceContract: balanceMainnetAddress,
},
}

View file

@ -0,0 +1,413 @@
package storagecfg
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"text/template"
"time"
"github.com/chzyer/readline"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/rpc/client"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
)
const (
walletFlag = "wallet"
accountFlag = "account"
)
const (
defaultControlEndpoint = "127.0.0.1:8090"
defaultDataEndpoint = "127.0.0.1"
)
// RootCmd is a root command of config section.
var RootCmd = &cobra.Command{
Use: "storage-config [-w wallet] [-a acccount] [<path-to-config>]",
Short: "Section for storage node configuration commands.",
Run: storageConfig,
}
func init() {
fs := RootCmd.Flags()
fs.StringP(walletFlag, "w", "", "path to wallet")
fs.StringP(accountFlag, "a", "", "wallet account")
}
type config struct {
AnnouncedAddress string
AuthorizedKeys []string
ControlEndpoint string
Endpoint string
TLSCert string
TLSKey string
MorphRPC []string
Attribute struct {
Locode string
}
Wallet struct {
Path string
Account string
Password string
}
Relay bool
BlobstorPath string
MetabasePath string
}
func storageConfig(cmd *cobra.Command, args []string) {
var outPath string
if len(args) != 0 {
outPath = args[0]
} else {
outPath = getPath("File to write config at [./config.yml]: ")
if outPath == "" {
outPath = "./config.yml"
}
}
historyPath := filepath.Join(os.TempDir(), "neofs-adm.history")
readline.SetHistoryPath(historyPath)
var c config
c.Wallet.Path, _ = cmd.Flags().GetString(walletFlag)
if c.Wallet.Path == "" {
c.Wallet.Path = getPath("Path to the storage node wallet: ")
}
w, err := wallet.NewWalletFromFile(c.Wallet.Path)
fatalOnErr(err)
c.Wallet.Account, _ = cmd.Flags().GetString(accountFlag)
if c.Wallet.Account == "" {
addr := address.Uint160ToString(w.GetChangeAddress())
c.Wallet.Account = getWalletAccount(w, fmt.Sprintf("Wallet account [%s]: ", addr))
if c.Wallet.Account == "" {
c.Wallet.Account = addr
}
}
accH, err := flags.ParseAddress(c.Wallet.Account)
fatalOnErr(err)
acc := w.GetAccount(accH)
if acc == nil {
fatalOnErr(errors.New("can't find account in wallet"))
}
c.Wallet.Password, err = input.ReadPassword(fmt.Sprintf("Account password for %s: ", c.Wallet.Account))
fatalOnErr(err)
err = acc.Decrypt(c.Wallet.Password, keys.NEP2ScryptParams())
fatalOnErr(err)
c.AuthorizedKeys = append(c.AuthorizedKeys, hex.EncodeToString(acc.PrivateKey().PublicKey().Bytes()))
var network string
for {
network = getString("Choose network [mainnet]/testnet: ")
switch network {
case "":
network = "mainnet"
case "testnet", "mainnet":
default:
cmd.Println(`Network must be either "mainnet" or "testnet"`)
continue
}
break
}
c.MorphRPC = n3config[network].MorphRPC
depositGas(cmd, acc, network)
c.Attribute.Locode = getString("UN-LOCODE attribute in [XX YYY] format: ")
var addr, port string
for {
c.AnnouncedAddress = getString("Publicly announced address: ")
addr, port, err = net.SplitHostPort(c.AnnouncedAddress)
if err != nil {
cmd.Println("Address must have form A.B.C.D:PORT")
continue
}
ip, err := net.ResolveIPAddr("ip", addr)
if err != nil {
cmd.Printf("Can't resolve IP address %s: %v\n", addr, err)
continue
}
if !ip.IP.IsGlobalUnicast() {
cmd.Println("IP must be global unicast.")
continue
}
cmd.Printf("Resolved IP address: %s\n", ip.String())
_, err = strconv.ParseUint(port, 10, 16)
if err != nil {
cmd.Println("Port must be an integer.")
continue
}
break
}
defaultAddr := net.JoinHostPort(defaultDataEndpoint, port)
c.Endpoint = getString(fmt.Sprintf("Listening address [%s]: ", defaultAddr))
if c.Endpoint == "" {
c.Endpoint = defaultAddr
}
c.ControlEndpoint = getString(fmt.Sprintf("Listening address (control endpoint) [%s]: ", defaultControlEndpoint))
if c.ControlEndpoint == "" {
c.ControlEndpoint = defaultControlEndpoint
}
c.TLSCert = getPath("TLS Certificate (optional): ")
if c.TLSCert != "" {
c.TLSKey = getPath("TLS Key: ")
}
c.Relay = getConfirmation(false, "Use node as a relay? yes/[no]: ")
if !c.Relay {
p := getPath("Path to the storage directory (all available storage will be used): ")
c.BlobstorPath = filepath.Join(p, "blob")
c.MetabasePath = filepath.Join(p, "meta")
}
out := applyTemplate(c)
fatalOnErr(ioutil.WriteFile(outPath, out, 0644))
cmd.Println("Node is ready for work! Run `neofs-node -config " + outPath + "`")
}
func getWalletAccount(w *wallet.Wallet, prompt string) string {
addrs := make([]readline.PrefixCompleterInterface, len(w.Accounts))
for i := range w.Accounts {
addrs[i] = readline.PcItem(w.Accounts[i].Address)
}
readline.SetAutoComplete(readline.NewPrefixCompleter(addrs...))
defer readline.SetAutoComplete(nil)
s, err := readline.Line(prompt)
fatalOnErr(err)
return strings.TrimSpace(s) // autocompleter can return a string with a trailing space
}
func getString(prompt string) string {
s, err := readline.Line(prompt)
fatalOnErr(err)
if s != "" {
_ = readline.AddHistory(s)
}
return s
}
type filenameCompleter struct{}
func (filenameCompleter) Do(line []rune, pos int) (newLine [][]rune, length int) {
prefix := string(line[:pos])
dir := filepath.Dir(prefix)
de, err := os.ReadDir(dir)
if err != nil {
return nil, 0
}
for i := range de {
name := filepath.Join(dir, de[i].Name())
if strings.HasPrefix(name, prefix) {
tail := []rune(strings.TrimPrefix(name, prefix))
if de[i].IsDir() {
tail = append(tail, filepath.Separator)
}
newLine = append(newLine, tail)
}
}
if pos != 0 {
return newLine, pos - len([]rune(dir))
}
return newLine, 0
}
func getPath(prompt string) string {
readline.SetAutoComplete(filenameCompleter{})
defer readline.SetAutoComplete(nil)
p, err := readline.Line(prompt)
fatalOnErr(err)
if p == "" {
return p
}
_ = readline.AddHistory(p)
abs, err := filepath.Abs(p)
if err != nil {
fatalOnErr(fmt.Errorf("can't create an absolute path: %w", err))
}
return abs
}
func getConfirmation(def bool, prompt string) bool {
for {
s, err := readline.Line(prompt)
fatalOnErr(err)
switch strings.ToLower(s) {
case "y", "yes":
return true
case "n", "no":
return false
default:
if len(s) == 0 {
return def
}
}
}
}
func applyTemplate(c config) []byte {
tmpl, err := template.New("config").Parse(configTemplate)
fatalOnErr(err)
b := bytes.NewBuffer(nil)
fatalOnErr(tmpl.Execute(b, c))
return b.Bytes()
}
func fatalOnErr(err error) {
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func depositGas(cmd *cobra.Command, acc *wallet.Account, network string) {
sideClient := initClient(n3config[network].MorphRPC)
balanceHash, _ := util.Uint160DecodeStringLE(n3config[network].BalanceContract)
res, err := sideClient.InvokeFunction(balanceHash, "balanceOf", []smartcontract.Parameter{{
Type: smartcontract.Hash160Type,
Value: acc.Contract.ScriptHash(),
}}, nil)
fatalOnErr(err)
if res.State != vm.HaltState.String() {
fatalOnErr(fmt.Errorf("invalid response from balance contract: %s", res.FaultException))
}
var balance *big.Int
if len(res.Stack) != 0 {
balance, _ = res.Stack[0].TryInteger()
}
if balance == nil {
fatalOnErr(errors.New("invalid response from balance contract"))
}
ok := getConfirmation(false, fmt.Sprintf("Current NeoFS balance is %s, make a deposit? y/[n]: ",
fixedn.ToString(balance, 12)))
if !ok {
return
}
amountStr := getString("Enter amount in GAS: ")
amount, err := fixedn.FromString(amountStr, 8)
if err != nil {
fatalOnErr(fmt.Errorf("invalid amount: %w", err))
}
mainClient := initClient(n3config[network].RPC)
neofsHash, _ := util.Uint160DecodeStringLE(n3config[network].NeoFSContract)
gasHash, err := mainClient.GetNativeContractHash(nativenames.Gas)
fatalOnErr(err)
tx, err := mainClient.CreateNEP17TransferTx(acc, neofsHash, gasHash, amount.Int64(), 0, nil, nil)
fatalOnErr(err)
txHash, err := mainClient.SignAndPushTx(tx, acc, nil)
fatalOnErr(err)
cmd.Print("Waiting for transactions to persist.")
tick := time.NewTicker(time.Second / 2)
defer tick.Stop()
timer := time.NewTimer(time.Second * 20)
defer timer.Stop()
at := trigger.Application
loop:
for {
select {
case <-tick.C:
_, err := mainClient.GetApplicationLog(txHash, &at)
if err == nil {
cmd.Print("\n")
break loop
}
cmd.Print(".")
case <-timer.C:
cmd.Printf("\nTimeout while waiting for transaction to persist.\n")
if getConfirmation(false, "Continue configuration? yes/[no]: ") {
return
}
os.Exit(1)
}
}
}
func initClient(rpc []string) *client.Client {
var c *client.Client
var err error
shuffled := make([]string, len(rpc))
copy(shuffled, rpc)
rand.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
for _, endpoint := range shuffled {
c, err = client.New(context.Background(), "https://"+endpoint, client.Options{
DialTimeout: time.Second * 2,
RequestTimeout: time.Second * 5,
})
if err != nil {
continue
}
if err = c.Init(); err != nil {
continue
}
return c
}
fatalOnErr(fmt.Errorf("can't create N3 client: %w", err))
panic("unreachable")
}

View file

@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"strings" "strings"
"github.com/flynn-archive/go-shlex" "github.com/flynn-archive/go-shlex"
@ -30,7 +31,10 @@ Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', '
Filter consists of <typ>:<key><match><value> Filter consists of <typ>:<key><match><value>
Typ is 'obj' for object applied filter or 'req' for request applied filter. Typ is 'obj' for object applied filter or 'req' for request applied filter.
Key is a valid unicode string corresponding to object or request header key. Key is a valid unicode string corresponding to object or request header key.
Match is '==' for matching and '!=' for non-matching filter. Well-known system object headers start with '$Object:' prefix.
User defined headers start without prefix.
Read more about filter keys at github.com/nspcc-dev/neofs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
Match is '=' for matching and '!=' for non-matching filter.
Value is a valid unicode string corresponding to object or request header value. Value is a valid unicode string corresponding to object or request header value.
Target is Target is
@ -64,20 +68,20 @@ func createEACL(cmd *cobra.Command, _ []string) {
containerID := cid.New() containerID := cid.New()
if err := containerID.Parse(cidArg); err != nil { if err := containerID.Parse(cidArg); err != nil {
cmd.Printf("invalid container ID: %v", err) cmd.PrintErrf("invalid container ID: %v\n", err)
return os.Exit(1)
} }
rulesFile, err := getRulesFromFile(fileArg) rulesFile, err := getRulesFromFile(fileArg)
if err != nil { if err != nil {
cmd.Printf("can't read rules from file : %v", err) cmd.PrintErrf("can't read rules from file: %v\n", err)
return os.Exit(1)
} }
rules = append(rules, rulesFile...) rules = append(rules, rulesFile...)
if len(rules) == 0 { if len(rules) == 0 {
cmd.Println("no extended ACL rules has been provided") cmd.PrintErrln("no extended ACL rules has been provided")
return os.Exit(1)
} }
tb := eacl.NewTable() tb := eacl.NewTable()
@ -85,14 +89,14 @@ func createEACL(cmd *cobra.Command, _ []string) {
for _, ruleStr := range rules { for _, ruleStr := range rules {
r, err := shlex.Split(ruleStr) r, err := shlex.Split(ruleStr)
if err != nil { if err != nil {
cmd.Printf("can't parse rule '%s': %v)", ruleStr, err) cmd.PrintErrf("can't parse rule '%s': %v\n", ruleStr, err)
return os.Exit(1)
} }
err = parseTable(tb, r) err = parseTable(tb, r)
if err != nil { if err != nil {
cmd.Printf("can't create extended ACL record from rule '%s': %v", ruleStr, err) cmd.PrintErrf("can't create extended ACL record from rule '%s': %v\n", ruleStr, err)
return os.Exit(1)
} }
} }
@ -100,15 +104,15 @@ func createEACL(cmd *cobra.Command, _ []string) {
data, err := tb.MarshalJSON() data, err := tb.MarshalJSON()
if err != nil { if err != nil {
cmd.Println(err) cmd.PrintErrln(err)
return os.Exit(1)
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err = json.Indent(buf, data, "", " ") err = json.Indent(buf, data, "", " ")
if err != nil { if err != nil {
cmd.Println(err) cmd.PrintErrln(err)
return os.Exit(1)
} }
if len(outArg) == 0 { if len(outArg) == 0 {
@ -118,7 +122,8 @@ func createEACL(cmd *cobra.Command, _ []string) {
err = ioutil.WriteFile(outArg, buf.Bytes(), 0644) err = ioutil.WriteFile(outArg, buf.Bytes(), 0644)
if err != nil { if err != nil {
cmd.Println(err) cmd.PrintErrln(err)
os.Exit(1)
} }
} }

View file

@ -215,9 +215,11 @@ func getKeyNoGenerate() (*ecdsa.PrivateKey, error) {
} }
func getPassword() (string, error) { func getPassword() (string, error) {
if pass := viper.GetString(password); pass != "" { // this check allows empty passwords
return pass, nil if viper.IsSet(password) {
return viper.GetString(password), nil
} }
return input.ReadPassword("Enter password > ") return input.ReadPassword("Enter password > ")
} }

View file

@ -1,7 +1,6 @@
package main package main
import ( import (
"sync"
"time" "time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
@ -25,8 +24,6 @@ type valueWithTime struct {
// entity that provides TTL cache interface. // entity that provides TTL cache interface.
type ttlNetCache struct { type ttlNetCache struct {
mtx sync.Mutex
ttl time.Duration ttl time.Duration
sz int sz int
@ -55,9 +52,6 @@ func newNetworkTTLCache(sz int, ttl time.Duration, netRdr netValueReader) *ttlNe
// //
// returned value should not be modified. // returned value should not be modified.
func (c *ttlNetCache) get(key interface{}) (interface{}, error) { func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
val, ok := c.cache.Peek(key) val, ok := c.cache.Peek(key)
if ok { if ok {
valWithTime := val.(*valueWithTime) valWithTime := val.(*valueWithTime)
@ -83,29 +77,21 @@ func (c *ttlNetCache) get(key interface{}) (interface{}, error) {
} }
func (c *ttlNetCache) remove(key interface{}) { func (c *ttlNetCache) remove(key interface{}) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.cache.Remove(key) c.cache.Remove(key)
} }
func (c *ttlNetCache) keys() []interface{} { func (c *ttlNetCache) keys() []interface{} {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.cache.Keys() return c.cache.Keys()
} }
// entity that provides LRU cache interface. // entity that provides LRU cache interface.
type lruNetCache struct { type lruNetCache struct {
mtx sync.Mutex
cache *lru.Cache cache *lru.Cache
netRdr netValueReader netRdr netValueReader
} }
// complicates netValueReader with LRU caching mechanism. // newNetworkLRUCache returns wrapper over netValueReader with LRU cache.
func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache { func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
cache, err := lru.New(sz) cache, err := lru.New(sz)
fatalOnErr(err) fatalOnErr(err)
@ -122,9 +108,6 @@ func newNetworkLRUCache(sz int, netRdr netValueReader) *lruNetCache {
// //
// returned value should not be modified. // returned value should not be modified.
func (c *lruNetCache) get(key interface{}) (interface{}, error) { func (c *lruNetCache) get(key interface{}) (interface{}, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
val, ok := c.cache.Get(key) val, ok := c.cache.Get(key)
if ok { if ok {
return val, nil return val, nil
@ -336,3 +319,39 @@ func (s *ttlContainerLister) InvalidateContainerListByCID(id *cid.ID) {
} }
} }
} }
type cachedIRFetcher ttlNetCache
func newCachedIRFetcher(f interface{ InnerRingKeys() ([][]byte, error) }) *cachedIRFetcher {
const (
irFetcherCacheSize = 1 // we intend to store only one value
// Without the cache in the testnet we can see several hundred simultaneous
// requests (neofs-node #1278), so limiting the request rate solves the issue.
//
// Exact request rate doesn't really matter because Inner Ring list update
// happens extremely rare, but there is no side chain events for that as
// for now (neofs-contract v0.15.0 notary disabled env) to monitor it.
irFetcherCacheTTL = 30 * time.Second
)
irFetcherCache := newNetworkTTLCache(irFetcherCacheSize, irFetcherCacheTTL,
func(key interface{}) (interface{}, error) {
return f.InnerRingKeys()
},
)
return (*cachedIRFetcher)(irFetcherCache)
}
// InnerRingKeys returns cached list of Inner Ring keys. If keys are missing in
// the cache or expired, then it returns keys from side chain and updates
// the cache.
func (f *cachedIRFetcher) InnerRingKeys() ([][]byte, error) {
val, err := (*ttlNetCache)(f).get("")
if err != nil {
return nil, err
}
return val.([][]byte), nil
}

View file

@ -116,6 +116,8 @@ type cfg struct {
clientCache *cache.ClientCache clientCache *cache.ClientCache
persistate *state.PersistentStorage persistate *state.PersistentStorage
netMapSource netmapCore.Source
} }
type cfgGRPC struct { type cfgGRPC struct {
@ -175,8 +177,6 @@ type cfgNodeInfo struct {
} }
type cfgObject struct { type cfgObject struct {
netMapSource netmapCore.Source
cnrSource container.Source cnrSource container.Source
eaclSource eacl.Source eaclSource eacl.Source

View file

@ -105,7 +105,7 @@ func initContainerService(c *cfg) {
loadPlacementBuilder := &loadPlacementBuilder{ loadPlacementBuilder := &loadPlacementBuilder{
log: c.log, log: c.log,
nmSrc: c.cfgNetmap.wrapper, nmSrc: c.netMapSource,
cnrSrc: cnrSrc, cnrSrc: cnrSrc,
} }

View file

@ -31,7 +31,7 @@ func initControlService(c *cfg) {
controlSvc.WithKey(&c.key.PrivateKey), controlSvc.WithKey(&c.key.PrivateKey),
controlSvc.WithAuthorizedKeys(rawPubs), controlSvc.WithAuthorizedKeys(rawPubs),
controlSvc.WithHealthChecker(c), controlSvc.WithHealthChecker(c),
controlSvc.WithNetMapSource(c.cfgNetmap.wrapper), controlSvc.WithNetMapSource(c.netMapSource),
controlSvc.WithNodeState(c), controlSvc.WithNodeState(c),
controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error { controlSvc.WithDeletedObjectHandler(func(addrList []*object.Address) error {
prm := new(engine.DeletePrm).WithAddresses(addrList...) prm := new(engine.DeletePrm).WithAddresses(addrList...)

View file

@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"time"
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc" grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
"github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/nspcc-dev/neofs-node/pkg/util/logger"
@ -95,7 +96,19 @@ func stopGRPC(name string, s *grpc.Server, l *logger.Logger) {
l.Info("stopping gRPC server...") l.Info("stopping gRPC server...")
// GracefulStop() may freeze forever, see #1270
done := make(chan struct{})
go func() {
s.GracefulStop() s.GracefulStop()
close(done)
}()
select {
case <-done:
case <-time.After(1 * time.Minute):
l.Info("gRPC cannot shutdown gracefully, forcing stop")
s.Stop()
}
l.Info("gRPC server stopped successfully") l.Info("gRPC server stopped successfully")
} }

View file

@ -99,7 +99,7 @@ func initMorphComponents(c *cfg) {
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap) netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
} }
c.cfgObject.netMapSource = netmapSource c.netMapSource = netmapSource
c.cfgNetmap.wrapper = wrap c.cfgNetmap.wrapper = wrap
} }

View file

@ -185,7 +185,7 @@ func initObjectService(c *cfg) {
clientConstructor := &reputationClientConstructor{ clientConstructor := &reputationClientConstructor{
log: c.log, log: c.log,
nmSrc: c.cfgObject.netMapSource, nmSrc: c.netMapSource,
netState: c.cfgNetmap.state, netState: c.cfgNetmap.state,
trustStorage: c.cfgReputation.localTrustStorage, trustStorage: c.cfgReputation.localTrustStorage,
basicConstructor: c.clientCache, basicConstructor: c.clientCache,
@ -228,7 +228,7 @@ func initObjectService(c *cfg) {
policer.WithLocalStorage(ls), policer.WithLocalStorage(ls),
policer.WithContainerSource(c.cfgObject.cnrSource), policer.WithContainerSource(c.cfgObject.cnrSource),
policer.WithPlacementBuilder( policer.WithPlacementBuilder(
placement.NewNetworkMapSourceBuilder(c.cfgObject.netMapSource), placement.NewNetworkMapSourceBuilder(c.netMapSource),
), ),
policer.WithRemoteHeader( policer.WithRemoteHeader(
headsvc.NewRemoteHeader(keyStorage, clientConstructor), headsvc.NewRemoteHeader(keyStorage, clientConstructor),
@ -251,7 +251,7 @@ func initObjectService(c *cfg) {
policer.WithNodeLoader(c), policer.WithNodeLoader(c),
) )
traverseGen := util.NewTraverserGenerator(c.cfgObject.netMapSource, c.cfgObject.cnrSource, c) traverseGen := util.NewTraverserGenerator(c.netMapSource, c.cfgObject.cnrSource, c)
c.workers = append(c.workers, pol) c.workers = append(c.workers, pol)
@ -261,7 +261,7 @@ func initObjectService(c *cfg) {
putsvc.WithMaxSizeSource(c), putsvc.WithMaxSizeSource(c),
putsvc.WithLocalStorage(ls), putsvc.WithLocalStorage(ls),
putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithContainerSource(c.cfgObject.cnrSource),
putsvc.WithNetworkMapSource(c.cfgObject.netMapSource), putsvc.WithNetworkMapSource(c.netMapSource),
putsvc.WithNetmapKeys(c), putsvc.WithNetmapKeys(c),
putsvc.WithFormatValidatorOpts( putsvc.WithFormatValidatorOpts(
objectCore.WithDeleteHandler(objInhumer), objectCore.WithDeleteHandler(objInhumer),
@ -285,7 +285,7 @@ func initObjectService(c *cfg) {
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
), ),
), ),
searchsvc.WithNetMapSource(c.cfgNetmap.wrapper), searchsvc.WithNetMapSource(c.netMapSource),
searchsvc.WithKeyStorage(keyStorage), searchsvc.WithKeyStorage(keyStorage),
) )
@ -303,7 +303,7 @@ func initObjectService(c *cfg) {
placement.SuccessAfter(1), placement.SuccessAfter(1),
), ),
), ),
getsvc.WithNetMapSource(c.cfgNetmap.wrapper), getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage), getsvc.WithKeyStorage(keyStorage),
) )
@ -348,8 +348,8 @@ func initObjectService(c *cfg) {
acl.WithSenderClassifier( acl.WithSenderClassifier(
acl.NewSenderClassifier( acl.NewSenderClassifier(
c.log, c.log,
irFetcher, newCachedIRFetcher(irFetcher),
c.cfgNetmap.wrapper, c.netMapSource,
), ),
), ),
acl.WithContainerSource( acl.WithContainerSource(

View file

@ -37,8 +37,7 @@ func initReputationService(c *cfg) {
localKey := c.key.PublicKey().Bytes() localKey := c.key.PublicKey().Bytes()
// consider sharing this between application components nmSrc := c.netMapSource
nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper)
// storing calculated trusts as a daughter // storing calculated trusts as a daughter
c.cfgReputation.localTrustStorage = truststorage.New( c.cfgReputation.localTrustStorage = truststorage.New(

View file

@ -3,7 +3,7 @@
version: "2.4" version: "2.4"
services: services:
storage01: storage01:
image: nspccdev/neofs-storage-testnet:0.27.5 image: nspccdev/neofs-storage-testnet:0.27.7
container_name: neofs-testnet container_name: neofs-testnet
env_file: node_config.env env_file: node_config.env
network_mode: host network_mode: host

1
go.mod
View file

@ -3,6 +3,7 @@ module github.com/nspcc-dev/neofs-node
go 1.16 go 1.16
require ( require (
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/google/go-github/v39 v39.2.0 github.com/google/go-github/v39 v39.2.0
github.com/google/uuid v1.2.0 github.com/google/uuid v1.2.0

3
go.sum
View file

@ -90,8 +90,11 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5OhCuC+XN+r/bBCmeuuJtjz+bCNIf8=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=

View file

@ -238,6 +238,7 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR
cliPrm.WithAddress(prm.objAddr) cliPrm.WithAddress(prm.objAddr)
cliPrm.WithRangeList(prm.rng) cliPrm.WithRangeList(prm.rng)
cliPrm.TZ()
cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm, cliRes, err := x.c.HashObjectPayloadRanges(prm.ctx, &cliPrm,
client.WithKey(x.key), client.WithKey(x.key),

View file

@ -325,12 +325,16 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
res, err = b.getRangeFromLevel(prm, p, !ok) res, err = b.getRangeFromLevel(prm, p, !ok)
if err != nil { if err != nil {
if !errors.Is(err, object.ErrNotFound) { outOfBounds := errors.Is(err, object.ErrRangeOutOfBounds)
if !errors.Is(err, object.ErrNotFound) && !outOfBounds {
b.log.Debug("could not get object from level", b.log.Debug("could not get object from level",
zap.String("level", p), zap.String("level", p),
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} }
if outOfBounds {
return true, err
}
} }
activeCache[dirPath] = struct{}{} activeCache[dirPath] = struct{}{}
@ -592,7 +596,7 @@ func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRang
// we don't use GetRange call for now since blobovnicza // we don't use GetRange call for now since blobovnicza
// stores data that is compressed on BlobStor side. // stores data that is compressed on BlobStor side.
// If blobovnicza learns to do the compression itself, // If blobovnicza learns to do the compression itself,
// wecan start using GetRange. // we can start using GetRange.
res, err := blz.Get(gPrm) res, err := blz.Get(gPrm)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -1,9 +1,11 @@
package blobstor package blobstor
import ( import (
"errors"
"fmt" "fmt"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
) )
// GetRangeBigPrm groups the parameters of GetRangeBig operation. // GetRangeBigPrm groups the parameters of GetRangeBig operation.
@ -27,6 +29,10 @@ func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
// get compressed object data // get compressed object data
data, err := b.fsTree.Get(prm.addr) data, err := b.fsTree.Get(prm.addr)
if err != nil { if err != nil {
if errors.Is(err, fstree.ErrFileNotFound) {
return nil, object.ErrNotFound
}
return nil, fmt.Errorf("could not read object from fs tree: %w", err) return nil, fmt.Errorf("could not read object from fs tree: %w", err)
} }

View file

@ -3,8 +3,10 @@ package engine
import ( import (
"errors" "errors"
"os" "os"
"path/filepath"
"testing" "testing"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -48,3 +50,38 @@ func TestExecBlocks(t *testing.T) {
// try to resume // try to resume
require.Error(t, e.ResumeExecution()) require.Error(t, e.ResumeExecution())
} }
func TestPersistentShardID(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
checkShardState(t, e, id[0], shard.ModeReadWrite)
require.NoError(t, e.Close())
e, _, newID := newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id, newID)
require.NoError(t, e.Close())
p1 := e.shards[id[0].String()].DumpInfo().MetaBaseInfo.Path
p2 := e.shards[id[1].String()].DumpInfo().MetaBaseInfo.Path
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, newID = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, id[1], newID[0])
require.Equal(t, id[0], newID[1])
require.NoError(t, e.Close())
}
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, mode shard.Mode) {
e.mtx.RLock()
sh := e.shards[id.String()]
e.mtx.RUnlock()
require.Equal(t, mode, sh.GetMode())
}

View file

@ -0,0 +1,110 @@
package engine
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
const errSmallSize = 256
func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
if dir == "" {
var err error
dir, err = os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
}
e := New(
WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1))
var ids [2]*shard.ID
var err error
for i := range ids {
ids[i], err = e.AddShard(
shard.WithLogger(zaptest.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
blobstor.WithShallowDepth(1),
blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(1),
blobstor.WithSmallSizeLimit(errSmallSize),
blobstor.WithRootPerm(0700)),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
meta.WithPermissions(0700)))
require.NoError(t, err)
}
require.NoError(t, e.Open())
require.NoError(t, e.Init())
return e, dir, ids
}
// Issue #1186.
func TestBlobstorFailback(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
objs := make([]*object.Object, 0, 2)
for _, size := range []int{15, errSmallSize + 1} {
obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, size))
prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock()
_, err = e.shards[id[0].String()].Put(prm)
e.mtx.RUnlock()
require.NoError(t, err)
objs = append(objs, obj.Object())
}
for i := range objs {
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
_, err = e.GetRange(&RngPrm{addr: objs[i].Address()})
require.NoError(t, err)
}
require.NoError(t, e.Close())
p1 := e.shards[id[0].String()].DumpInfo().BlobStorInfo.RootPath
p2 := e.shards[id[1].String()].DumpInfo().BlobStorInfo.RootPath
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
for i := range objs {
getRes, err := e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
require.Equal(t, objs[i], getRes.Object())
rngRes, err := e.GetRange(&RngPrm{addr: objs[i].Address(), off: 1, ln: 10})
require.NoError(t, err)
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
_, err = e.GetRange(&RngPrm{addr: objs[i].Address(), off: errSmallSize + 10, ln: 1})
require.True(t, errors.Is(err, object.ErrRangeOutOfBounds), "got: %v", err)
}
}

View file

@ -64,6 +64,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
outSI *objectSDK.SplitInfo outSI *objectSDK.SplitInfo
outError = object.ErrNotFound outError = object.ErrNotFound
shardWithMeta hashedShard
) )
shPrm := new(shard.GetPrm). shPrm := new(shard.GetPrm).
@ -72,6 +74,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
res, err := sh.Get(shPrm) res, err := sh.Get(shPrm)
if err != nil { if err != nil {
if res.HasMeta() {
shardWithMeta = hashedShard{sh: sh}
}
switch { switch {
case errors.Is(err, object.ErrNotFound): case errors.Is(err, object.ErrNotFound):
return false // ignore, go to next shard return false // ignore, go to next shard
@ -116,9 +121,25 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
} }
if obj == nil { if obj == nil {
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
return nil, outError return nil, outError
} }
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
shPrm = shPrm.WithIgnoreMeta(true)
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
res, err := sh.Get(shPrm)
obj = res.Object()
return err == nil
})
if obj == nil {
return nil, outError
}
}
return &GetRes{ return &GetRes{
obj: obj, obj: obj,
}, nil }, nil

View file

@ -82,6 +82,8 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
outSI *objectSDK.SplitInfo outSI *objectSDK.SplitInfo
outError = object.ErrNotFound outError = object.ErrNotFound
shardWithMeta hashedShard
) )
shPrm := new(shard.RngPrm). shPrm := new(shard.RngPrm).
@ -91,6 +93,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
res, err := sh.GetRange(shPrm) res, err := sh.GetRange(shPrm)
if err != nil { if err != nil {
if res.HasMeta() {
shardWithMeta = hashedShard{sh: sh}
}
switch { switch {
case errors.Is(err, object.ErrNotFound): case errors.Is(err, object.ErrNotFound):
return false // ignore, go to next shard return false // ignore, go to next shard
@ -137,9 +142,29 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
} }
if obj == nil { if obj == nil {
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
return nil, outError return nil, outError
} }
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
shPrm = shPrm.WithIgnoreMeta(true)
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
res, err := sh.GetRange(shPrm)
if errors.Is(err, object.ErrRangeOutOfBounds) {
outError = object.ErrRangeOutOfBounds
return true
}
obj = res.Object()
return err == nil
})
if obj == nil {
return nil, outError
}
}
return &RngRes{ return &RngRes{
obj: obj, obj: obj,
}, nil }, nil

View file

@ -25,26 +25,34 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) {
e.mtx.Lock() e.mtx.Lock()
defer e.mtx.Unlock() defer e.mtx.Unlock()
id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
}
pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true)) pool, err := ants.NewPool(int(e.shardPoolSize), ants.WithNonblocking(true))
if err != nil { if err != nil {
return nil, err return nil, err
} }
strID := id.String() id, err := generateShardID()
if err != nil {
return nil, fmt.Errorf("could not generate shard ID: %w", err)
}
e.shards[strID] = shard.New(append(opts, sh := shard.New(append(opts,
shard.WithID(id), shard.WithID(id),
shard.WithExpiredObjectsCallback(e.processExpiredTombstones), shard.WithExpiredObjectsCallback(e.processExpiredTombstones),
)...) )...)
if err := sh.UpdateID(); err != nil {
return nil, fmt.Errorf("could not open shard: %w", err)
}
strID := sh.ID().String()
if _, ok := e.shards[strID]; ok {
return nil, fmt.Errorf("shard with id %s was already added", strID)
}
e.shards[strID] = sh
e.shardPools[strID] = pool e.shardPools[strID] = pool
return id, nil return sh.ID(), nil
} }
func generateShardID() (*shard.ID, error) { func generateShardID() (*shard.ID, error) {

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
@ -47,9 +48,17 @@ type referenceCounter map[string]*referenceNumber
// Delete removed object records from metabase indexes. // Delete removed object records from metabase indexes.
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) { func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
return new(DeleteRes), db.boltDB.Update(func(tx *bbolt.Tx) error { err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.deleteGroup(tx, prm.addrs) return db.deleteGroup(tx, prm.addrs)
}) })
if err == nil {
for i := range prm.addrs {
storagelog.Write(db.log,
storagelog.AddressField(prm.addrs[i]),
storagelog.OpField("metabase DELETE"))
}
}
return new(DeleteRes), err
} }
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error { func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*objectSDK.Address) error {

View file

@ -7,6 +7,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -68,6 +69,11 @@ func (db *DB) Put(prm *PutPrm) (res *PutRes, err error) {
err = db.boltDB.Batch(func(tx *bbolt.Tx) error { err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.put(tx, prm.obj, prm.id, nil) return db.put(tx, prm.obj, prm.id, nil)
}) })
if err == nil {
storagelog.Write(db.log,
storagelog.AddressField(prm.obj.Address()),
storagelog.OpField("metabase PUT"))
}
return return
} }

View file

@ -0,0 +1,36 @@
package meta
import (
"github.com/nspcc-dev/neo-go/pkg/util/slice"
"go.etcd.io/bbolt"
)
var (
shardInfoBucket = []byte(invalidBase58String + "i")
shardIDKey = []byte("id")
)
// ReadShardID reads shard id from db.
// If id is missing, returns nil, nil.
func (db *DB) ReadShardID() ([]byte, error) {
var id []byte
err := db.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(shardInfoBucket)
if b != nil {
id = slice.Copy(b.Get(shardIDKey))
}
return nil
})
return id, err
}
// WriteShardID writes shard it to db.
func (db *DB) WriteShardID(id []byte) error {
return db.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
if err != nil {
return err
}
return b.Put(shardIDKey, id)
})
}

View file

@ -26,7 +26,6 @@ func (s *Shard) Open() error {
return fmt.Errorf("could not open %T: %w", component, err) return fmt.Errorf("could not open %T: %w", component, err)
} }
} }
return nil return nil
} }

View file

@ -19,11 +19,13 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob
// GetPrm groups the parameters of Get operation. // GetPrm groups the parameters of Get operation.
type GetPrm struct { type GetPrm struct {
addr *objectSDK.Address addr *objectSDK.Address
skipMeta bool
} }
// GetRes groups resulting values of Get operation. // GetRes groups resulting values of Get operation.
type GetRes struct { type GetRes struct {
obj *object.Object obj *object.Object
hasMeta bool
} }
// WithAddress is a Get option to set the address of the requested object. // WithAddress is a Get option to set the address of the requested object.
@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
return p return p
} }
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
// without accessing metabase.
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
p.skipMeta = ignore
return p
}
// Object returns the requested object. // Object returns the requested object.
func (r *GetRes) Object() *object.Object { func (r *GetRes) Object() *object.Object {
return r.obj return r.obj
} }
// HasMeta returns true if info about the object was found in the metabase.
func (r *GetRes) HasMeta() bool {
return r.hasMeta
}
// Get reads an object from shard. // Get reads an object from shard.
// //
// Returns any error encountered that // Returns any error encountered that
@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
return res.Object(), nil return res.Object(), nil
} }
obj, err := s.fetchObjectData(prm.addr, big, small) obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
return &GetRes{ return &GetRes{
obj: obj, obj: obj,
hasMeta: hasMeta,
}, err }, err
} }
// fetchObjectData looks through writeCache and blobStor to find object. // fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) (*object.Object, error) { func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) {
var ( var (
err error err error
res *object.Object res *object.Object
@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
if s.hasWriteCache() { if s.hasWriteCache() {
res, err = s.writeCache.Get(addr) res, err = s.writeCache.Get(addr)
if err == nil { if err == nil {
return res, nil return res, false, nil
} }
if errors.Is(err, object.ErrNotFound) { if errors.Is(err, object.ErrNotFound) {
@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
} }
} }
if skipMeta {
res, err = small(s.blobStor, nil)
if err == nil || errors.Is(err, object.ErrRangeOutOfBounds) {
return res, false, err
}
res, err = big(s.blobStor, nil)
return res, false, err
}
exists, err := meta.Exists(s.metaBase, addr) exists, err := meta.Exists(s.metaBase, addr)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
if !exists { if !exists {
return nil, object.ErrNotFound return nil, false, object.ErrNotFound
} }
blobovniczaID, err := meta.IsSmall(s.metaBase, addr) blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
} }
if blobovniczaID != nil { if blobovniczaID != nil {
@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
res, err = big(s.blobStor, nil) res, err = big(s.blobStor, nil)
} }
return res, err return res, true, err
} }

View file

@ -23,3 +23,25 @@ func (id ID) String() string {
func (s *Shard) ID() *ID { func (s *Shard) ID() *ID {
return s.info.ID return s.info.ID
} }
// UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID() (err error) {
if err = s.metaBase.Open(); err != nil {
return err
}
defer func() {
cErr := s.metaBase.Close()
if err == nil {
err = cErr
}
}()
id, err := s.metaBase.ReadShardID()
if err != nil {
return err
}
if len(id) != 0 {
s.info.ID = NewIDFromBytes(id)
return nil
}
return s.metaBase.WriteShardID(*s.info.ID)
}

View file

@ -14,11 +14,14 @@ type RngPrm struct {
off uint64 off uint64
addr *objectSDK.Address addr *objectSDK.Address
skipMeta bool
} }
// RngRes groups resulting values of GetRange operation. // RngRes groups resulting values of GetRange operation.
type RngRes struct { type RngRes struct {
obj *object.Object obj *object.Object
hasMeta bool
} }
// WithAddress is a Rng option to set the address of the requested object. // WithAddress is a Rng option to set the address of the requested object.
@ -41,6 +44,13 @@ func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
return p return p
} }
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
// without accessing metabase.
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
p.skipMeta = ignore
return p
}
// Object returns the requested object part. // Object returns the requested object part.
// //
// Instance payload contains the requested range of the original object. // Instance payload contains the requested range of the original object.
@ -48,6 +58,11 @@ func (r *RngRes) Object() *object.Object {
return r.obj return r.obj
} }
// HasMeta returns true if info about the object was found in the metabase.
func (r *RngRes) HasMeta() bool {
return r.hasMeta
}
// GetRange reads part of an object from shard. // GetRange reads part of an object from shard.
// //
// Returns any error encountered that // Returns any error encountered that
@ -94,9 +109,10 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
return obj.Object(), nil return obj.Object(), nil
} }
obj, err := s.fetchObjectData(prm.addr, big, small) obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
return &RngRes{ return &RngRes{
obj: obj, obj: obj,
hasMeta: hasMeta,
}, err }, err
} }

View file

@ -96,7 +96,7 @@ func New(opts ...Option) *Shard {
return s return s
} }
// WithID returns option to set shard identifier. // WithID returns option to set the default shard identifier.
func WithID(id *ID) Option { func WithID(id *ID) Option {
return func(c *cfg) { return func(c *cfg) {
c.info.ID = id c.info.ID = id

View file

@ -52,7 +52,9 @@ func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
obj, err := engine.Get(p.localStorage, task.addr) obj, err := engine.Get(p.localStorage, task.addr)
if err != nil { if err != nil {
p.log.Error("could not get object from local storage") p.log.Error("could not get object from local storage",
zap.Stringer("object", task.addr),
zap.Error(err))
return return
} }