Move diff from nspcc master and support branches #28
50 changed files with 845 additions and 469 deletions
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -4,8 +4,19 @@ Changelog for FrostFS Node
|
|||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Separate batching for replicated operations over the same container in pilorama (#1621)
|
||||
- Doc for extended headers (#2128)
|
||||
|
||||
### Changed
|
||||
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
|
||||
|
||||
### Fixed
|
||||
- Big object removal with non-local parts (#1978)
|
||||
- Disable pilorama when moving to degraded mode (#2197)
|
||||
- Fetching blobovnicza objects that not found in write-cache (#2206)
|
||||
- Do not search for the small objects in FSTree (#2206)
|
||||
- Correct status error for expired session token (#2207)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
- `neo-go` to `v0.100.1`
|
||||
|
@ -48,6 +59,7 @@ Changelog for FrostFS Node
|
|||
- Shard uses metabase for `HEAD` requests by default, not write-cache (#2167)
|
||||
- Clarify help for `--expire-at` parameter for commands `object lock/put` and `bearer create` (#2097)
|
||||
- Node spawns `GETRANGE` requests signed with the node's key if session key was not found for `RANGEHASH` (#2144)
|
||||
- Full list of container is no longer cached (#2176)
|
||||
|
||||
### Fixed
|
||||
- Open FSTree in sync mode by default (#1992)
|
||||
|
|
34
cmd/frostfs-cli/docs/storage-node-xheaders.md
Normal file
34
cmd/frostfs-cli/docs/storage-node-xheaders.md
Normal file
|
@ -0,0 +1,34 @@
|
|||
# Extended headers
|
||||
|
||||
## Overview
|
||||
|
||||
Extended headers are used for request/response. They may contain any user-defined headers
|
||||
to be interpreted on application level.
|
||||
Key name must be a unique valid UTF-8 string. Value can't be empty. Requests or
|
||||
Responses with duplicated header names or headers with empty values are
|
||||
considered invalid.
|
||||
|
||||
## Existing headers
|
||||
|
||||
There are some "well-known" headers starting with `__NEOFS__` prefix that
|
||||
affect system behaviour:
|
||||
|
||||
* `__NEOFS__NETMAP_EPOCH` - netmap epoch to use for object placement calculation. The `value` is string
|
||||
encoded `uint64` in decimal presentation. If set to '0' or omitted, the
|
||||
current epoch only will be used.
|
||||
* `__NEOFS__NETMAP_LOOKUP_DEPTH` - if object can't be found using current epoch's netmap, this header limits
|
||||
how many past epochs the node can look up through. Depth is applied to a current epoch or the value
|
||||
of `__NEOFS__NETMAP_EPOCH` attribute. The `value` is string encoded `uint64` in decimal presentation.
|
||||
If set to '0' or not set, only the current epoch is used.
|
||||
|
||||
## `neofs-cli` commands with `--xhdr`
|
||||
|
||||
List of commands with support of extended headers:
|
||||
* `container list-objects`
|
||||
* `object delete/get/hash/head/lock/put/range/search`
|
||||
* `storagegroup delete/get/list/put`
|
||||
|
||||
Example:
|
||||
```shell
|
||||
$ neofs-cli object put -r s01.neofs.devenv:8080 -w wallet.json --cid CID --file FILE --xhdr "__NEOFS__NETMAP_EPOCH=777"
|
||||
```
|
|
@ -21,25 +21,25 @@ var errInvalidEndpoint = errors.New("provided RPC endpoint is incorrect")
|
|||
// GetSDKClientByFlag returns default frostfs-sdk-go client using the specified flag for the address.
|
||||
// On error, outputs to stderr of cmd and exits with non-zero code.
|
||||
func GetSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) *client.Client {
|
||||
cli, err := getSDKClientByFlag(key, endpointFlag)
|
||||
cli, err := getSDKClientByFlag(cmd, key, endpointFlag)
|
||||
if err != nil {
|
||||
common.ExitOnErr(cmd, "can't create API client: %w", err)
|
||||
}
|
||||
return cli
|
||||
}
|
||||
|
||||
func getSDKClientByFlag(key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
|
||||
func getSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
|
||||
var addr network.Address
|
||||
|
||||
err := addr.FromString(viper.GetString(endpointFlag))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %w", errInvalidEndpoint, err)
|
||||
}
|
||||
return GetSDKClient(key, addr)
|
||||
return GetSDKClient(cmd, key, addr)
|
||||
}
|
||||
|
||||
// GetSDKClient returns default frostfs-sdk-go client.
|
||||
func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
|
||||
func GetSDKClient(cmd *cobra.Command, key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
|
||||
var (
|
||||
c client.Client
|
||||
prmInit client.PrmInit
|
||||
|
@ -56,7 +56,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
|
|||
prmDial.SetTimeout(timeout)
|
||||
prmDial.SetStreamTimeout(timeout)
|
||||
|
||||
common.PrintVerbose("Set request timeout to %s.", timeout)
|
||||
common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
|
||||
}
|
||||
|
||||
c.Init(prmInit)
|
||||
|
@ -69,7 +69,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
|
|||
}
|
||||
|
||||
// GetCurrentEpoch returns current epoch.
|
||||
func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) {
|
||||
func GetCurrentEpoch(ctx context.Context, cmd *cobra.Command, endpoint string) (uint64, error) {
|
||||
var addr network.Address
|
||||
|
||||
if err := addr.FromString(endpoint); err != nil {
|
||||
|
@ -81,7 +81,7 @@ func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) {
|
|||
return 0, fmt.Errorf("can't generate key to sign query: %w", err)
|
||||
}
|
||||
|
||||
c, err := GetSDKClient(key, addr)
|
||||
c, err := GetSDKClient(cmd, key, addr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
|
|||
ExitOnErr(cmd, "", errors.New("incorrect path to file with EACL"))
|
||||
}
|
||||
|
||||
PrintVerbose("Reading EACL from file: %s", eaclPath)
|
||||
PrintVerbose(cmd, "Reading EACL from file: %s", eaclPath)
|
||||
|
||||
data, err := os.ReadFile(eaclPath)
|
||||
ExitOnErr(cmd, "can't read file with EACL: %w", err)
|
||||
|
@ -28,13 +28,13 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
|
|||
|
||||
if err = table.UnmarshalJSON(data); err == nil {
|
||||
validateAndFixEACLVersion(table)
|
||||
PrintVerbose("Parsed JSON encoded EACL table")
|
||||
PrintVerbose(cmd, "Parsed JSON encoded EACL table")
|
||||
return table
|
||||
}
|
||||
|
||||
if err = table.Unmarshal(data); err == nil {
|
||||
validateAndFixEACLVersion(table)
|
||||
PrintVerbose("Parsed binary encoded EACL table")
|
||||
PrintVerbose(cmd, "Parsed binary encoded EACL table")
|
||||
return table
|
||||
}
|
||||
|
||||
|
|
|
@ -11,12 +11,12 @@ import (
|
|||
func PrettyPrintJSON(cmd *cobra.Command, m json.Marshaler, entity string) {
|
||||
data, err := m.MarshalJSON()
|
||||
if err != nil {
|
||||
PrintVerbose("Can't convert %s to json: %w", entity, err)
|
||||
PrintVerbose(cmd, "Can't convert %s to json: %w", entity, err)
|
||||
return
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.Indent(buf, data, "", " "); err != nil {
|
||||
PrintVerbose("Can't pretty print json: %w", err)
|
||||
PrintVerbose(cmd, "Can't pretty print json: %w", err)
|
||||
return
|
||||
}
|
||||
cmd.Println(buf)
|
||||
|
|
|
@ -19,11 +19,11 @@ func ReadBearerToken(cmd *cobra.Command, flagname string) *bearer.Token {
|
|||
return nil
|
||||
}
|
||||
|
||||
PrintVerbose("Reading bearer token from file [%s]...", path)
|
||||
PrintVerbose(cmd, "Reading bearer token from file [%s]...", path)
|
||||
|
||||
var tok bearer.Token
|
||||
|
||||
err = ReadBinaryOrJSON(&tok, path)
|
||||
err = ReadBinaryOrJSON(cmd, &tok, path)
|
||||
ExitOnErr(cmd, "invalid bearer token: %v", err)
|
||||
|
||||
return &tok
|
||||
|
@ -38,8 +38,8 @@ type BinaryOrJSON interface {
|
|||
|
||||
// ReadBinaryOrJSON reads file data using provided path and decodes
|
||||
// BinaryOrJSON from the data.
|
||||
func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error {
|
||||
PrintVerbose("Reading file [%s]...", fPath)
|
||||
func ReadBinaryOrJSON(cmd *cobra.Command, dst BinaryOrJSON, fPath string) error {
|
||||
PrintVerbose(cmd, "Reading file [%s]...", fPath)
|
||||
|
||||
// try to read session token from file
|
||||
data, err := os.ReadFile(fPath)
|
||||
|
@ -47,17 +47,17 @@ func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error {
|
|||
return fmt.Errorf("read file <%s>: %w", fPath, err)
|
||||
}
|
||||
|
||||
PrintVerbose("Trying to decode binary...")
|
||||
PrintVerbose(cmd, "Trying to decode binary...")
|
||||
|
||||
err = dst.Unmarshal(data)
|
||||
if err != nil {
|
||||
PrintVerbose("Failed to decode binary: %v", err)
|
||||
PrintVerbose(cmd, "Failed to decode binary: %v", err)
|
||||
|
||||
PrintVerbose("Trying to decode JSON...")
|
||||
PrintVerbose(cmd, "Trying to decode JSON...")
|
||||
|
||||
err = dst.UnmarshalJSON(data)
|
||||
if err != nil {
|
||||
PrintVerbose("Failed to decode JSON: %v", err)
|
||||
PrintVerbose(cmd, "Failed to decode JSON: %v", err)
|
||||
return errors.New("invalid format")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package common
|
|||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -13,9 +12,9 @@ import (
|
|||
)
|
||||
|
||||
// PrintVerbose prints to the stdout if the commonflags.Verbose flag is on.
|
||||
func PrintVerbose(format string, a ...interface{}) {
|
||||
func PrintVerbose(cmd *cobra.Command, format string, a ...interface{}) {
|
||||
if viper.GetBool(commonflags.Verbose) {
|
||||
fmt.Printf(format+"\n", a...)
|
||||
cmd.Printf(format+"\n", a...)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,11 +11,18 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/cli/input"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/term"
|
||||
)
|
||||
|
||||
var testCmd = &cobra.Command{
|
||||
Use: "test",
|
||||
Short: "test",
|
||||
Run: func(cmd *cobra.Command, args []string) {},
|
||||
}
|
||||
|
||||
func Test_getOrGenerate(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
|
@ -96,7 +103,7 @@ func Test_getOrGenerate(t *testing.T) {
|
|||
|
||||
t.Run("generate", func(t *testing.T) {
|
||||
viper.Set(commonflags.GenerateKey, true)
|
||||
actual, err := getOrGenerate()
|
||||
actual, err := getOrGenerate(testCmd)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, actual)
|
||||
for _, p := range []*keys.PrivateKey{nep2Key, rawKey, wifKey, acc1.PrivateKey(), acc2.PrivateKey()} {
|
||||
|
@ -107,13 +114,13 @@ func Test_getOrGenerate(t *testing.T) {
|
|||
|
||||
func checkKeyError(t *testing.T, desc string, err error) {
|
||||
viper.Set(commonflags.WalletPath, desc)
|
||||
_, actualErr := getOrGenerate()
|
||||
_, actualErr := getOrGenerate(testCmd)
|
||||
require.ErrorIs(t, actualErr, err)
|
||||
}
|
||||
|
||||
func checkKey(t *testing.T, desc string, expected *keys.PrivateKey) {
|
||||
viper.Set(commonflags.WalletPath, desc)
|
||||
actual, err := getOrGenerate()
|
||||
actual, err := getOrGenerate(testCmd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, &expected.PrivateKey, actual)
|
||||
}
|
||||
|
|
|
@ -20,12 +20,12 @@ var errCantGenerateKey = errors.New("can't generate new private key")
|
|||
// Ideally we want to touch file-system on the last step.
|
||||
// This function assumes that all flags were bind to viper in a `PersistentPreRun`.
|
||||
func Get(cmd *cobra.Command) *ecdsa.PrivateKey {
|
||||
pk, err := get()
|
||||
pk, err := get(cmd)
|
||||
common.ExitOnErr(cmd, "can't fetch private key: %w", err)
|
||||
return pk
|
||||
}
|
||||
|
||||
func get() (*ecdsa.PrivateKey, error) {
|
||||
func get(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
|
||||
keyDesc := viper.GetString(commonflags.WalletPath)
|
||||
data, err := os.ReadFile(keyDesc)
|
||||
if err != nil {
|
||||
|
@ -36,7 +36,7 @@ func get() (*ecdsa.PrivateKey, error) {
|
|||
if err != nil {
|
||||
w, err := wallet.NewWalletFromFile(keyDesc)
|
||||
if err == nil {
|
||||
return FromWallet(w, viper.GetString(commonflags.Account))
|
||||
return FromWallet(cmd, w, viper.GetString(commonflags.Account))
|
||||
}
|
||||
return nil, fmt.Errorf("%w: %v", ErrInvalidKey, err)
|
||||
}
|
||||
|
@ -45,12 +45,12 @@ func get() (*ecdsa.PrivateKey, error) {
|
|||
|
||||
// GetOrGenerate is similar to get but generates a new key if commonflags.GenerateKey is set.
|
||||
func GetOrGenerate(cmd *cobra.Command) *ecdsa.PrivateKey {
|
||||
pk, err := getOrGenerate()
|
||||
pk, err := getOrGenerate(cmd)
|
||||
common.ExitOnErr(cmd, "can't fetch private key: %w", err)
|
||||
return pk
|
||||
}
|
||||
|
||||
func getOrGenerate() (*ecdsa.PrivateKey, error) {
|
||||
func getOrGenerate(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
|
||||
if viper.GetBool(commonflags.GenerateKey) {
|
||||
priv, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
|
@ -58,5 +58,5 @@ func getOrGenerate() (*ecdsa.PrivateKey, error) {
|
|||
}
|
||||
return &priv.PrivateKey, nil
|
||||
}
|
||||
return get()
|
||||
return get(cmd)
|
||||
}
|
||||
|
|
|
@ -3,13 +3,14 @@ package key
|
|||
import (
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
"github.com/nspcc-dev/neo-go/cli/flags"
|
||||
"github.com/nspcc-dev/neo-go/cli/input"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
|
@ -22,37 +23,37 @@ var (
|
|||
)
|
||||
|
||||
// FromWallet returns private key of the wallet account.
|
||||
func FromWallet(w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) {
|
||||
func FromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) {
|
||||
var (
|
||||
addr util.Uint160
|
||||
err error
|
||||
)
|
||||
|
||||
if addrStr == "" {
|
||||
printVerbose("Using default wallet address")
|
||||
common.PrintVerbose(cmd, "Using default wallet address")
|
||||
addr = w.GetChangeAddress()
|
||||
} else {
|
||||
addr, err = flags.ParseAddress(addrStr)
|
||||
if err != nil {
|
||||
printVerbose("Can't parse address: %s", addrStr)
|
||||
common.PrintVerbose(cmd, "Can't parse address: %s", addrStr)
|
||||
return nil, ErrInvalidAddress
|
||||
}
|
||||
}
|
||||
|
||||
acc := w.GetAccount(addr)
|
||||
if acc == nil {
|
||||
printVerbose("Can't find wallet account for %s", addrStr)
|
||||
common.PrintVerbose(cmd, "Can't find wallet account for %s", addrStr)
|
||||
return nil, ErrInvalidAddress
|
||||
}
|
||||
|
||||
pass, err := getPassword()
|
||||
if err != nil {
|
||||
printVerbose("Can't read password: %v", err)
|
||||
common.PrintVerbose(cmd, "Can't read password: %v", err)
|
||||
return nil, ErrInvalidPassword
|
||||
}
|
||||
|
||||
if err := acc.Decrypt(pass, keys.NEP2ScryptParams()); err != nil {
|
||||
printVerbose("Can't decrypt account: %v", err)
|
||||
common.PrintVerbose(cmd, "Can't decrypt account: %v", err)
|
||||
return nil, ErrInvalidPassword
|
||||
}
|
||||
|
||||
|
@ -67,9 +68,3 @@ func getPassword() (string, error) {
|
|||
|
||||
return input.ReadPassword("Enter password > ")
|
||||
}
|
||||
|
||||
func printVerbose(format string, a ...interface{}) {
|
||||
if viper.GetBool("verbose") {
|
||||
fmt.Printf(format+"\n", a...)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func createToken(cmd *cobra.Command, _ []string) {
|
|||
defer cancel()
|
||||
|
||||
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
|
||||
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint)
|
||||
currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
|
||||
common.ExitOnErr(cmd, "can't fetch current epoch: %w", err)
|
||||
|
||||
if iatRelative {
|
||||
|
|
|
@ -36,7 +36,7 @@ var createContainerCmd = &cobra.Command{
|
|||
Long: `Create new container and register it in the NeoFS.
|
||||
It will be stored in sidechain when inner ring will accepts it.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
placementPolicy, err := parseContainerPolicy(containerPolicy)
|
||||
placementPolicy, err := parseContainerPolicy(cmd, containerPolicy)
|
||||
common.ExitOnErr(cmd, "", err)
|
||||
|
||||
key := key.Get(cmd)
|
||||
|
@ -165,10 +165,10 @@ func initContainerCreateCmd() {
|
|||
"Skip placement validity check")
|
||||
}
|
||||
|
||||
func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error) {
|
||||
func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.PlacementPolicy, error) {
|
||||
_, err := os.Stat(policyString) // check if `policyString` is a path to file with placement policy
|
||||
if err == nil {
|
||||
common.PrintVerbose("Reading placement policy from file: %s", policyString)
|
||||
common.PrintVerbose(cmd, "Reading placement policy from file: %s", policyString)
|
||||
|
||||
data, err := os.ReadFile(policyString)
|
||||
if err != nil {
|
||||
|
@ -182,12 +182,12 @@ func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error)
|
|||
|
||||
err = result.DecodeString(policyString)
|
||||
if err == nil {
|
||||
common.PrintVerbose("Parsed QL encoded policy")
|
||||
common.PrintVerbose(cmd, "Parsed QL encoded policy")
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
if err = result.UnmarshalJSON([]byte(policyString)); err == nil {
|
||||
common.PrintVerbose("Parsed JSON encoded policy")
|
||||
common.PrintVerbose(cmd, "Parsed JSON encoded policy")
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ Only owner of the container has a permission to remove container.`,
|
|||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
||||
if force, _ := cmd.Flags().GetBool(commonflags.ForceFlag); !force {
|
||||
common.PrintVerbose("Reading the container to check ownership...")
|
||||
common.PrintVerbose(cmd, "Reading the container to check ownership...")
|
||||
|
||||
var getPrm internalclient.GetContainerPrm
|
||||
getPrm.SetClient(cli)
|
||||
|
@ -39,13 +39,13 @@ Only owner of the container has a permission to remove container.`,
|
|||
owner := resGet.Container().Owner()
|
||||
|
||||
if tok != nil {
|
||||
common.PrintVerbose("Checking session issuer...")
|
||||
common.PrintVerbose(cmd, "Checking session issuer...")
|
||||
|
||||
if !tok.Issuer().Equals(owner) {
|
||||
common.ExitOnErr(cmd, "", fmt.Errorf("session issuer differs with the container owner: expected %s, has %s", owner, tok.Issuer()))
|
||||
}
|
||||
} else {
|
||||
common.PrintVerbose("Checking provided account...")
|
||||
common.PrintVerbose(cmd, "Checking provided account...")
|
||||
|
||||
var acc user.ID
|
||||
user.IDFromKey(&acc, pk.PublicKey)
|
||||
|
@ -55,10 +55,10 @@ Only owner of the container has a permission to remove container.`,
|
|||
}
|
||||
}
|
||||
|
||||
common.PrintVerbose("Account matches the container owner.")
|
||||
common.PrintVerbose(cmd, "Account matches the container owner.")
|
||||
|
||||
if tok != nil {
|
||||
common.PrintVerbose("Skip searching for LOCK objects - session provided.")
|
||||
common.PrintVerbose(cmd, "Skip searching for LOCK objects - session provided.")
|
||||
} else {
|
||||
fs := objectSDK.NewSearchFilters()
|
||||
fs.AddTypeFilter(objectSDK.MatchStringEqual, objectSDK.TypeLock)
|
||||
|
@ -69,7 +69,7 @@ Only owner of the container has a permission to remove container.`,
|
|||
searchPrm.SetFilters(fs)
|
||||
searchPrm.SetTTL(2)
|
||||
|
||||
common.PrintVerbose("Searching for LOCK objects...")
|
||||
common.PrintVerbose(cmd, "Searching for LOCK objects...")
|
||||
|
||||
res, err := internalclient.SearchObjects(searchPrm)
|
||||
common.ExitOnErr(cmd, "can't search for LOCK objects: %w", err)
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/ecdsa"
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
|
@ -77,18 +75,7 @@ func (x *stringWriter) WriteString(s string) (n int, err error) {
|
|||
|
||||
func prettyPrintContainer(cmd *cobra.Command, cnr container.Container, jsonEncoding bool) {
|
||||
if jsonEncoding {
|
||||
data, err := cnr.MarshalJSON()
|
||||
if err != nil {
|
||||
common.PrintVerbose("Can't convert container to json: %w", err)
|
||||
return
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.Indent(buf, data, "", " "); err != nil {
|
||||
common.PrintVerbose("Can't pretty print json: %w", err)
|
||||
}
|
||||
|
||||
cmd.Println(buf)
|
||||
|
||||
common.PrettyPrintJSON(cmd, cnr, "container")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -36,22 +36,22 @@ func parseContainerID(cmd *cobra.Command) cid.ID {
|
|||
// decodes session.Container from the file by path provided in
|
||||
// commonflags.SessionToken flag. Returns nil if the path is not specified.
|
||||
func getSession(cmd *cobra.Command) *session.Container {
|
||||
common.PrintVerbose("Reading container session...")
|
||||
common.PrintVerbose(cmd, "Reading container session...")
|
||||
|
||||
path, _ := cmd.Flags().GetString(commonflags.SessionToken)
|
||||
if path == "" {
|
||||
common.PrintVerbose("Session not provided.")
|
||||
common.PrintVerbose(cmd, "Session not provided.")
|
||||
return nil
|
||||
}
|
||||
|
||||
common.PrintVerbose("Reading container session from the file [%s]...", path)
|
||||
common.PrintVerbose(cmd, "Reading container session from the file [%s]...", path)
|
||||
|
||||
var res session.Container
|
||||
|
||||
err := common.ReadBinaryOrJSON(&res, path)
|
||||
err := common.ReadBinaryOrJSON(cmd, &res, path)
|
||||
common.ExitOnErr(cmd, "read container session: %v", err)
|
||||
|
||||
common.PrintVerbose("Session successfully read.")
|
||||
common.PrintVerbose(cmd, "Session successfully read.")
|
||||
|
||||
return &res
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
|
|||
|
||||
printIgnoreForce := func(st control.NetmapStatus) {
|
||||
if force {
|
||||
common.PrintVerbose("Ignore --%s flag for %s state.", commonflags.ForceFlag, st)
|
||||
common.PrintVerbose(cmd, "Ignore --%s flag for %s state.", commonflags.ForceFlag, st)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
|
|||
|
||||
if force {
|
||||
body.SetForceMaintenance()
|
||||
common.PrintVerbose("Local maintenance will be forced.")
|
||||
common.PrintVerbose(cmd, "Local maintenance will be forced.")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -60,13 +60,13 @@ var objectLockCmd = &cobra.Command{
|
|||
|
||||
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
|
||||
|
||||
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint)
|
||||
currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
|
||||
common.ExitOnErr(cmd, "Request current epoch: %w", err)
|
||||
|
||||
exp = currEpoch + lifetime
|
||||
}
|
||||
|
||||
common.PrintVerbose("Lock object will expire after %d epoch", exp)
|
||||
common.PrintVerbose(cmd, "Lock object will expire after %d epoch", exp)
|
||||
|
||||
var expirationAttr objectSDK.Attribute
|
||||
expirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||
|
|
|
@ -46,7 +46,7 @@ func InitBearer(cmd *cobra.Command) {
|
|||
// Prepare prepares object-related parameters for a command.
|
||||
func Prepare(cmd *cobra.Command, prms ...RPCParameters) {
|
||||
ttl := viper.GetUint32(commonflags.TTL)
|
||||
common.PrintVerbose("TTL: %d", ttl)
|
||||
common.PrintVerbose(cmd, "TTL: %d", ttl)
|
||||
|
||||
for i := range prms {
|
||||
btok := common.ReadBearerToken(cmd, bearerTokenFlag)
|
||||
|
@ -127,19 +127,19 @@ func readSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr
|
|||
// decodes session.Object from the file by path specified in the
|
||||
// commonflags.SessionToken flag. Returns nil if flag is not set.
|
||||
func getSession(cmd *cobra.Command) *session.Object {
|
||||
common.PrintVerbose("Trying to read session from the file...")
|
||||
common.PrintVerbose(cmd, "Trying to read session from the file...")
|
||||
|
||||
path, _ := cmd.Flags().GetString(commonflags.SessionToken)
|
||||
if path == "" {
|
||||
common.PrintVerbose("File with session token is not provided.")
|
||||
common.PrintVerbose(cmd, "File with session token is not provided.")
|
||||
return nil
|
||||
}
|
||||
|
||||
common.PrintVerbose("Reading session from the file [%s]...", path)
|
||||
common.PrintVerbose(cmd, "Reading session from the file [%s]...", path)
|
||||
|
||||
var tok session.Object
|
||||
|
||||
err := common.ReadBinaryOrJSON(&tok, path)
|
||||
err := common.ReadBinaryOrJSON(cmd, &tok, path)
|
||||
common.ExitOnErr(cmd, "read session: %v", err)
|
||||
|
||||
return &tok
|
||||
|
@ -185,7 +185,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
|
|||
return
|
||||
}
|
||||
|
||||
common.PrintVerbose("Checking session correctness...")
|
||||
common.PrintVerbose(cmd, "Checking session correctness...")
|
||||
|
||||
switch false {
|
||||
case tok.AssertContainer(cnr):
|
||||
|
@ -200,7 +200,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
|
|||
common.ExitOnErr(cmd, "", errors.New("invalid signature of the session data"))
|
||||
}
|
||||
|
||||
common.PrintVerbose("Session is correct.")
|
||||
common.PrintVerbose(cmd, "Session is correct.")
|
||||
|
||||
dst.SetSessionToken(tok)
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.
|
|||
objs = []oid.ID{*obj}
|
||||
|
||||
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
|
||||
common.PrintVerbose("Collecting relatives of the removal object...")
|
||||
common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
|
||||
|
||||
objs = append(objs, collectObjectRelatives(cmd, cli, cnr, *obj)...)
|
||||
}
|
||||
|
@ -259,7 +259,7 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
|
|||
|
||||
if obj != nil {
|
||||
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
|
||||
common.PrintVerbose("Collecting relatives of the removal object...")
|
||||
common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
|
||||
|
||||
rels := collectObjectRelatives(cmd, cli, cnr, *obj)
|
||||
|
||||
|
@ -275,12 +275,12 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
|
|||
|
||||
const sessionLifetime = 10 // in NeoFS epochs
|
||||
|
||||
common.PrintVerbose("Opening remote session with the node...")
|
||||
common.PrintVerbose(cmd, "Opening remote session with the node...")
|
||||
|
||||
err := sessionCli.CreateSession(&tok, cli, sessionLifetime)
|
||||
common.ExitOnErr(cmd, "open remote session: %w", err)
|
||||
|
||||
common.PrintVerbose("Session successfully opened.")
|
||||
common.PrintVerbose(cmd, "Session successfully opened.")
|
||||
|
||||
finalizeSession(cmd, dst, &tok, key, cnr, objs...)
|
||||
|
||||
|
@ -297,33 +297,33 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
|
|||
// *internal.PutObjectPrm
|
||||
// *internal.DeleteObjectPrm
|
||||
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) {
|
||||
common.PrintVerbose("Finalizing session token...")
|
||||
common.PrintVerbose(cmd, "Finalizing session token...")
|
||||
|
||||
switch dst.(type) {
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported op parameters %T", dst))
|
||||
case *internal.PutObjectPrm:
|
||||
common.PrintVerbose("Binding session to object PUT...")
|
||||
common.PrintVerbose(cmd, "Binding session to object PUT...")
|
||||
tok.ForVerb(session.VerbObjectPut)
|
||||
case *internal.DeleteObjectPrm:
|
||||
common.PrintVerbose("Binding session to object DELETE...")
|
||||
common.PrintVerbose(cmd, "Binding session to object DELETE...")
|
||||
tok.ForVerb(session.VerbObjectDelete)
|
||||
}
|
||||
|
||||
common.PrintVerbose("Binding session to container %s...", cnr)
|
||||
common.PrintVerbose(cmd, "Binding session to container %s...", cnr)
|
||||
|
||||
tok.BindContainer(cnr)
|
||||
if len(objs) > 0 {
|
||||
common.PrintVerbose("Limiting session by the objects %v...", objs)
|
||||
common.PrintVerbose(cmd, "Limiting session by the objects %v...", objs)
|
||||
tok.LimitByObjects(objs...)
|
||||
}
|
||||
|
||||
common.PrintVerbose("Signing session...")
|
||||
common.PrintVerbose(cmd, "Signing session...")
|
||||
|
||||
err := tok.Sign(*key)
|
||||
common.ExitOnErr(cmd, "sign session: %w", err)
|
||||
|
||||
common.PrintVerbose("Session token successfully formed and attached to the request.")
|
||||
common.PrintVerbose(cmd, "Session token successfully formed and attached to the request.")
|
||||
|
||||
dst.SetSessionToken(tok)
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ func initFlagSession(cmd *cobra.Command, verb string) {
|
|||
//
|
||||
// The object itself is not included in the result.
|
||||
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
|
||||
common.PrintVerbose("Fetching raw object header...")
|
||||
common.PrintVerbose(cmd, "Fetching raw object header...")
|
||||
|
||||
// request raw header first
|
||||
var addrObj oid.Address
|
||||
|
@ -361,10 +361,10 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
default:
|
||||
common.ExitOnErr(cmd, "failed to get raw object header: %w", err)
|
||||
case err == nil:
|
||||
common.PrintVerbose("Raw header received - object is singular.")
|
||||
common.PrintVerbose(cmd, "Raw header received - object is singular.")
|
||||
return nil
|
||||
case errors.As(err, &errSplit):
|
||||
common.PrintVerbose("Split information received - object is virtual.")
|
||||
common.PrintVerbose(cmd, "Split information received - object is virtual.")
|
||||
}
|
||||
|
||||
splitInfo := errSplit.SplitInfo()
|
||||
|
@ -373,7 +373,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
// If any approach fails, we don't try the next since we assume that it will fail too.
|
||||
|
||||
if idLinking, ok := splitInfo.Link(); ok {
|
||||
common.PrintVerbose("Collecting split members using linking object %s...", idLinking)
|
||||
common.PrintVerbose(cmd, "Collecting split members using linking object %s...", idLinking)
|
||||
|
||||
addrObj.SetObject(idLinking)
|
||||
prmHead.SetAddress(addrObj)
|
||||
|
@ -381,18 +381,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
// client is already set
|
||||
|
||||
res, err := internal.HeadObject(prmHead)
|
||||
common.ExitOnErr(cmd, "failed to get linking object's header: %w", err)
|
||||
if err == nil {
|
||||
children := res.Header().Children()
|
||||
|
||||
children := res.Header().Children()
|
||||
common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
|
||||
|
||||
common.PrintVerbose("Received split members from the linking object: %v", children)
|
||||
// include linking object
|
||||
return append(children, idLinking)
|
||||
}
|
||||
|
||||
// include linking object
|
||||
return append(children, idLinking)
|
||||
// linking object is not required for
|
||||
// object collecting
|
||||
common.PrintVerbose(cmd, "failed to get linking object's header: %w", err)
|
||||
}
|
||||
|
||||
if idSplit := splitInfo.SplitID(); idSplit != nil {
|
||||
common.PrintVerbose("Collecting split members by split ID...")
|
||||
common.PrintVerbose(cmd, "Collecting split members by split ID...")
|
||||
|
||||
var query object.SearchFilters
|
||||
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
|
||||
|
@ -407,7 +411,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
|
||||
members := res.IDList()
|
||||
|
||||
common.PrintVerbose("Found objects by split ID: %v", res.IDList())
|
||||
common.PrintVerbose(cmd, "Found objects by split ID: %v", res.IDList())
|
||||
|
||||
return members
|
||||
}
|
||||
|
@ -417,7 +421,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information"))
|
||||
}
|
||||
|
||||
common.PrintVerbose("Traverse the object split chain in reverse...", idMember)
|
||||
common.PrintVerbose(cmd, "Traverse the object split chain in reverse...", idMember)
|
||||
|
||||
var res *internal.HeadObjectRes
|
||||
chain := []oid.ID{idMember}
|
||||
|
@ -427,7 +431,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
// split members are almost definitely singular, but don't get hung up on it
|
||||
|
||||
for {
|
||||
common.PrintVerbose("Reading previous element of the split chain member %s...", idMember)
|
||||
common.PrintVerbose(cmd, "Reading previous element of the split chain member %s...", idMember)
|
||||
|
||||
addrObj.SetObject(idMember)
|
||||
|
||||
|
@ -436,7 +440,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
|
||||
idMember, ok = res.Header().PreviousID()
|
||||
if !ok {
|
||||
common.PrintVerbose("Chain ended.")
|
||||
common.PrintVerbose(cmd, "Chain ended.")
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -448,7 +452,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
chainSet[idMember] = struct{}{}
|
||||
}
|
||||
|
||||
common.PrintVerbose("Looking for a linking object...")
|
||||
common.PrintVerbose(cmd, "Looking for a linking object...")
|
||||
|
||||
var query object.SearchFilters
|
||||
query.AddParentIDFilter(object.MatchStringEqual, obj)
|
||||
|
@ -465,7 +469,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
|
||||
for i := range list {
|
||||
if _, ok = chainSet[list[i]]; !ok {
|
||||
common.PrintVerbose("Found one more related object %s.", list[i])
|
||||
common.PrintVerbose(cmd, "Found one more related object %s.", list[i])
|
||||
chain = append(chain, list[i])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -118,6 +118,6 @@ func initConfig() {
|
|||
|
||||
// If a config file is found, read it in.
|
||||
if err := viper.ReadInConfig(); err == nil {
|
||||
common.PrintVerbose("Using config file: %s", viper.ConfigFileUsed())
|
||||
common.PrintVerbose(rootCmd, "Using config file: %s", viper.ConfigFileUsed())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func createSession(cmd *cobra.Command, _ []string) {
|
|||
addrStr, _ := cmd.Flags().GetString(commonflags.RPC)
|
||||
common.ExitOnErr(cmd, "can't parse endpoint: %w", netAddr.FromString(addrStr))
|
||||
|
||||
c, err := internalclient.GetSDKClient(privKey, netAddr)
|
||||
c, err := internalclient.GetSDKClient(cmd, privKey, netAddr)
|
||||
common.ExitOnErr(cmd, "can't create client: %w", err)
|
||||
|
||||
lifetime := uint64(defaultLifetime)
|
||||
|
|
|
@ -83,7 +83,7 @@ func addByPath(cmd *cobra.Command, _ []string) {
|
|||
|
||||
nn := resp.GetBody().GetNodes()
|
||||
if len(nn) == 0 {
|
||||
common.PrintVerbose("No new nodes were created")
|
||||
common.PrintVerbose(cmd, "No new nodes were created")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ func getByPath(cmd *cobra.Command, _ []string) {
|
|||
|
||||
nn := resp.GetBody().GetNodes()
|
||||
if len(nn) == 0 {
|
||||
common.PrintVerbose("The node is not found")
|
||||
common.PrintVerbose(cmd, "The node is not found")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
||||
|
@ -45,7 +43,7 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
|
|||
}
|
||||
|
||||
if len(to) == 0 {
|
||||
prettyPrintJSON(cmd, data)
|
||||
common.PrettyPrintJSON(cmd, table, "eACL")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -54,12 +52,3 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
|
|||
|
||||
cmd.Printf("extended ACL table was successfully dumped to %s\n", to)
|
||||
}
|
||||
|
||||
func prettyPrintJSON(cmd *cobra.Command, data []byte) {
|
||||
buf := new(bytes.Buffer)
|
||||
if err := json.Indent(buf, data, "", " "); err != nil {
|
||||
common.PrintVerbose("Can't pretty print json: %w", err)
|
||||
}
|
||||
|
||||
cmd.Println(buf)
|
||||
}
|
||||
|
|
|
@ -51,8 +51,7 @@ func signBearerToken(cmd *cobra.Command, _ []string) {
|
|||
}
|
||||
|
||||
if len(to) == 0 {
|
||||
prettyPrintJSON(cmd, data)
|
||||
|
||||
common.PrettyPrintJSON(cmd, btok, "bearer token")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
|
|||
new(session.Object),
|
||||
new(session.Container),
|
||||
} {
|
||||
errLast = common.ReadBinaryOrJSON(el, fPath)
|
||||
errLast = common.ReadBinaryOrJSON(cmd, el, fPath)
|
||||
if errLast == nil {
|
||||
stok = el
|
||||
break
|
||||
|
@ -71,7 +71,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
|
|||
|
||||
to := cmd.Flag(signToFlag).Value.String()
|
||||
if len(to) == 0 {
|
||||
prettyPrintJSON(cmd, data)
|
||||
common.PrettyPrintJSON(cmd, stok, "session token")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -215,7 +215,8 @@ func (s *lruNetmapSource) Epoch() (uint64, error) {
|
|||
// wrapper over TTL cache of values read from the network
|
||||
// that implements container lister.
|
||||
type ttlContainerLister struct {
|
||||
*ttlNetCache[string, *cacheItemContainerList]
|
||||
inner *ttlNetCache[string, *cacheItemContainerList]
|
||||
client *cntClient.Client
|
||||
}
|
||||
|
||||
// value type for ttlNetCache used by ttlContainerLister.
|
||||
|
@ -251,20 +252,18 @@ func newCachedContainerLister(c *cntClient.Client, ttl time.Duration) ttlContain
|
|||
}, nil
|
||||
})
|
||||
|
||||
return ttlContainerLister{lruCnrListerCache}
|
||||
return ttlContainerLister{inner: lruCnrListerCache, client: c}
|
||||
}
|
||||
|
||||
// List returns list of container IDs from the cache. If list is missing in the
|
||||
// cache or expired, then it returns container IDs from side chain and updates
|
||||
// the cache.
|
||||
func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
|
||||
var str string
|
||||
|
||||
if id != nil {
|
||||
str = id.EncodeToString()
|
||||
if id == nil {
|
||||
return s.client.List(nil)
|
||||
}
|
||||
|
||||
item, err := s.get(str)
|
||||
item, err := s.inner.get(id.EncodeToString())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -287,13 +286,17 @@ func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
|
|||
func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
|
||||
strOwner := owner.EncodeToString()
|
||||
|
||||
val, ok := s.cache.Get(strOwner)
|
||||
val, ok := s.inner.cache.Peek(strOwner)
|
||||
if !ok {
|
||||
// we could cache the single cnr but in this case we will disperse
|
||||
// with the Sidechain a lot
|
||||
return
|
||||
}
|
||||
|
||||
if s.inner.ttl <= time.Since(val.t) {
|
||||
return
|
||||
}
|
||||
|
||||
item := val.v
|
||||
|
||||
item.mtx.Lock()
|
||||
|
|
|
@ -127,7 +127,6 @@ func initContainerService(c *cfg) {
|
|||
cnrRdr.get = c.cfgObject.cnrSource
|
||||
|
||||
cnrWrt.cacheEnabled = true
|
||||
cnrWrt.lists = cachedContainerLister
|
||||
cnrWrt.eacls = cachedEACLStorage
|
||||
}
|
||||
|
||||
|
@ -658,7 +657,6 @@ type morphContainerWriter struct {
|
|||
|
||||
cacheEnabled bool
|
||||
eacls ttlEACLStorage
|
||||
lists ttlContainerLister
|
||||
}
|
||||
|
||||
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
||||
|
|
|
@ -41,8 +41,8 @@ type boltDBCfg struct {
|
|||
boltOptions *bbolt.Options
|
||||
}
|
||||
|
||||
func defaultCfg(с *cfg) {
|
||||
*с = cfg{
|
||||
func defaultCfg(c *cfg) {
|
||||
*c = cfg{
|
||||
boltDBCfg: boltDBCfg{
|
||||
perm: os.ModePerm, // 0777
|
||||
boltOptions: &bbolt.Options{
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -45,11 +46,14 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
}
|
||||
|
||||
if _, err := active.blz.Put(putPrm); err != nil {
|
||||
// check if blobovnicza is full
|
||||
if errors.Is(err, blobovnicza.ErrFull) {
|
||||
b.log.Debug("blobovnicza overflowed",
|
||||
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))),
|
||||
)
|
||||
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
|
||||
// or update active blobovnicza in other thread. In the latter case the database will be closed
|
||||
// and `updateActive` takes care of not updating the active blobovnicza twice.
|
||||
if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
||||
if isFull {
|
||||
b.log.Debug("blobovnicza overflowed",
|
||||
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))))
|
||||
}
|
||||
|
||||
if err := b.updateActive(p, &active.ind); err != nil {
|
||||
if !isLogical(err) {
|
||||
|
|
|
@ -239,15 +239,17 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
|
|||
prm.RawData = t.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
err := t.writeFile(p, prm.RawData)
|
||||
tmpPath := p + "#"
|
||||
err := t.writeFile(tmpPath, prm.RawData)
|
||||
if err != nil {
|
||||
var pe *fs.PathError
|
||||
if errors.As(err, &pe) && pe.Err == syscall.ENOSPC {
|
||||
err = common.ErrNoSpace
|
||||
_ = os.RemoveAll(tmpPath)
|
||||
}
|
||||
}
|
||||
|
||||
return common.PutRes{StorageID: []byte{}}, err
|
||||
return common.PutRes{StorageID: []byte{}}, os.Rename(tmpPath, p)
|
||||
}
|
||||
|
||||
func (t *FSTree) writeFlags() int {
|
||||
|
|
|
@ -82,6 +82,8 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
|
|||
return EvacuateShardRes{}, errMustHaveTwoShards
|
||||
}
|
||||
|
||||
e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList))
|
||||
|
||||
// We must have all shards, to have correct information about their
|
||||
// indexes in a sorted slice and set appropriate marks in the metabase.
|
||||
// Evacuated shard is skipped during put.
|
||||
|
@ -185,5 +187,7 @@ mainLoop:
|
|||
}
|
||||
}
|
||||
|
||||
e.log.Info("finished shards evacuation",
|
||||
zap.Strings("shard_ids", sidList))
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -65,8 +65,12 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
|
||||
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
pool := e.shardPools[sh.ID().String()]
|
||||
pool, ok := e.shardPools[sh.ID().String()]
|
||||
e.mtx.RUnlock()
|
||||
if !ok {
|
||||
// Shard was concurrently removed, skip.
|
||||
return false
|
||||
}
|
||||
|
||||
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
|
||||
finished = putDone || exists
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
var _ pilorama.Forest = (*StorageEngine)(nil)
|
||||
|
||||
// TreeMove implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
|
||||
index, lst, err := e.getTreeShard(d.CID, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return nil, err
|
||||
|
@ -32,7 +32,7 @@ func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pil
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.Move, error) {
|
||||
index, lst, err := e.getTreeShard(d.CID, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return nil, err
|
||||
|
|
50
pkg/local_object_storage/pilorama/batch.go
Normal file
50
pkg/local_object_storage/pilorama/batch.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type batch struct {
|
||||
forest *boltForest
|
||||
timer *time.Timer
|
||||
mtx sync.Mutex
|
||||
start sync.Once
|
||||
cid cidSDK.ID
|
||||
treeID string
|
||||
results []chan<- error
|
||||
operations []*Move
|
||||
}
|
||||
|
||||
func (b *batch) trigger() {
|
||||
b.mtx.Lock()
|
||||
if b.timer != nil {
|
||||
b.timer.Stop()
|
||||
b.timer = nil
|
||||
}
|
||||
b.mtx.Unlock()
|
||||
b.start.Do(b.run)
|
||||
}
|
||||
|
||||
func (b *batch) run() {
|
||||
sort.Slice(b.operations, func(i, j int) bool {
|
||||
return b.operations[i].Time < b.operations[j].Time
|
||||
})
|
||||
fullID := bucketName(b.cid, b.treeID)
|
||||
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lm Move
|
||||
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
||||
})
|
||||
for i := range b.operations {
|
||||
b.results[i] <- err
|
||||
}
|
||||
}
|
|
@ -12,6 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/util"
|
||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||
|
@ -21,8 +22,13 @@ import (
|
|||
type boltForest struct {
|
||||
db *bbolt.DB
|
||||
|
||||
modeMtx sync.Mutex
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
|
||||
// mtx protects batches field.
|
||||
mtx sync.Mutex
|
||||
batches []*batch
|
||||
|
||||
cfg
|
||||
}
|
||||
|
||||
|
@ -31,6 +37,12 @@ var (
|
|||
logBucket = []byte{1}
|
||||
)
|
||||
|
||||
// ErrDegradedMode is returned when pilorama is in a degraded mode.
|
||||
var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode")
|
||||
|
||||
// ErrReadOnlyMode is returned when pilorama is in a read-only mode.
|
||||
var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode")
|
||||
|
||||
// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
|
||||
//
|
||||
// Each tree is stored in a separate bucket by `CID + treeID` key.
|
||||
|
@ -71,12 +83,9 @@ func (t *boltForest) SetMode(m mode.Mode) error {
|
|||
if t.mode == m {
|
||||
return nil
|
||||
}
|
||||
if t.mode.ReadOnly() == m.ReadOnly() {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := t.Close()
|
||||
if err == nil {
|
||||
if err == nil && !m.NoMetabase() {
|
||||
if err = t.Open(m.ReadOnly()); err == nil {
|
||||
err = t.Init()
|
||||
}
|
||||
|
@ -110,7 +119,7 @@ func (t *boltForest) Open(readOnly bool) error {
|
|||
return nil
|
||||
}
|
||||
func (t *boltForest) Init() error {
|
||||
if t.db.IsReadOnly() {
|
||||
if t.mode.NoMetabase() || t.db.IsReadOnly() {
|
||||
return nil
|
||||
}
|
||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||
|
@ -133,29 +142,45 @@ func (t *boltForest) Close() error {
|
|||
}
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) {
|
||||
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
var lm LogMove
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
} else if t.mode.ReadOnly() {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
lm := *m
|
||||
fullID := bucketName(d.CID, treeID)
|
||||
return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
if m.Child == RootID {
|
||||
m.Child = t.findSpareID(bTree)
|
||||
lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
if lm.Child == RootID {
|
||||
lm.Child = t.findSpareID(bTree)
|
||||
}
|
||||
lm.Move = *m
|
||||
return t.applyOperation(bLog, bTree, &lm)
|
||||
return t.do(bLog, bTree, make([]byte, 17), &lm)
|
||||
})
|
||||
}
|
||||
|
||||
// TreeExists implements the Forest interface.
|
||||
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return false, ErrDegradedMode
|
||||
}
|
||||
|
||||
var exists bool
|
||||
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
|
@ -168,7 +193,7 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
|
||||
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
@ -176,11 +201,21 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
|
|||
return nil, ErrNotPathAttribute
|
||||
}
|
||||
|
||||
var lm []LogMove
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
} else if t.mode.ReadOnly() {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
var lm []Move
|
||||
var key [17]byte
|
||||
|
||||
fullID := bucketName(d.CID, treeID)
|
||||
err := t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -191,9 +226,9 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
|
|||
}
|
||||
|
||||
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
|
||||
lm = make([]LogMove, len(path)-i+1)
|
||||
lm = make([]Move, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i].Move = Move{
|
||||
lm[j-i] = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: ts,
|
||||
|
@ -211,7 +246,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
|
|||
node = lm[j-i].Child
|
||||
}
|
||||
|
||||
lm[len(lm)-1].Move = Move{
|
||||
lm[len(lm)-1] = Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: ts,
|
||||
|
@ -240,17 +275,14 @@ func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint6
|
|||
// findSpareID returns random unused ID.
|
||||
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
|
||||
id := uint64(rand.Int63())
|
||||
|
||||
var key [9]byte
|
||||
key[0] = 't'
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
key := make([]byte, 9)
|
||||
|
||||
for {
|
||||
if bTree.Get(key[:]) == nil {
|
||||
_, _, _, ok := t.getState(bTree, stateKey(key, id))
|
||||
if !ok {
|
||||
return id
|
||||
}
|
||||
id = uint64(rand.Int63())
|
||||
binary.LittleEndian.PutUint64(key[1:], id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -260,6 +292,15 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
return ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
} else if t.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if backgroundSync {
|
||||
var seen bool
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
|
@ -280,19 +321,68 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
}
|
||||
}
|
||||
|
||||
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if t.db.MaxBatchSize == 1 {
|
||||
fullID := bucketName(d.CID, treeID)
|
||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lm := &LogMove{Move: *m}
|
||||
return t.applyOperation(bLog, bTree, lm)
|
||||
})
|
||||
var lm Move
|
||||
return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
|
||||
})
|
||||
}
|
||||
|
||||
ch := make(chan error, 1)
|
||||
t.addBatch(d, treeID, m, ch)
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
|
||||
treeRoot := bucketName(cid, treeID)
|
||||
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
|
||||
t.mtx.Lock()
|
||||
for i := 0; i < len(t.batches); i++ {
|
||||
t.batches[i].mtx.Lock()
|
||||
if t.batches[i].timer == nil {
|
||||
t.batches[i].mtx.Unlock()
|
||||
copy(t.batches[i:], t.batches[i+1:])
|
||||
t.batches = t.batches[:len(t.batches)-1]
|
||||
i--
|
||||
continue
|
||||
}
|
||||
|
||||
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
|
||||
if found {
|
||||
t.batches[i].results = append(t.batches[i].results, ch)
|
||||
t.batches[i].operations = append(t.batches[i].operations, m)
|
||||
if len(t.batches[i].operations) == t.db.MaxBatchSize {
|
||||
t.batches[i].timer.Stop()
|
||||
t.batches[i].timer = nil
|
||||
t.batches[i].mtx.Unlock()
|
||||
b := t.batches[i]
|
||||
t.mtx.Unlock()
|
||||
b.trigger()
|
||||
return
|
||||
}
|
||||
t.batches[i].mtx.Unlock()
|
||||
t.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
t.batches[i].mtx.Unlock()
|
||||
}
|
||||
b := &batch{
|
||||
forest: t,
|
||||
cid: d.CID,
|
||||
treeID: treeID,
|
||||
results: []chan<- error{ch},
|
||||
operations: []*Move{m},
|
||||
}
|
||||
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
|
||||
t.batches = append(t.batches, b)
|
||||
t.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) {
|
||||
child := tx.Bucket(treeRoot)
|
||||
if child != nil {
|
||||
return child.Bucket(logBucket), child.Bucket(dataBucket), nil
|
||||
|
@ -313,16 +403,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
|
|||
return bLog, bData, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
|
||||
var tmp LogMove
|
||||
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
|
||||
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error {
|
||||
var tmp Move
|
||||
var cKey [17]byte
|
||||
|
||||
var logKey [8]byte
|
||||
binary.BigEndian.PutUint64(logKey[:], lm.Time)
|
||||
if logBucket.Get(logKey[:]) != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := logBucket.Cursor()
|
||||
|
||||
key, value := c.Last()
|
||||
|
@ -331,82 +416,87 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
|
|||
r := io.NewBinReaderFromIO(b)
|
||||
|
||||
// 1. Undo up until the desired timestamp is here.
|
||||
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
|
||||
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
return err
|
||||
|
||||
tmp.Child = r.ReadU64LE()
|
||||
tmp.Parent = r.ReadU64LE()
|
||||
tmp.Time = r.ReadVarUint()
|
||||
if r.Err != nil {
|
||||
return r.Err
|
||||
}
|
||||
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
|
||||
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
key, value = c.Prev()
|
||||
}
|
||||
|
||||
key, _ = c.Next()
|
||||
for i := 0; i < len(ms); i++ {
|
||||
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
||||
|
||||
// 2. Insert the operation.
|
||||
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
|
||||
// 2. Insert the operation.
|
||||
*lm = *ms[i]
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if key == nil {
|
||||
// The operation is inserted in the beginning, reposition the cursor.
|
||||
// Otherwise, `Next` call will return currently inserted operation.
|
||||
c.First()
|
||||
}
|
||||
key, value = c.Seek(key)
|
||||
|
||||
// 3. Re-apply all other operations.
|
||||
for len(key) == 8 {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
|
||||
return err
|
||||
}
|
||||
// Cursor can be invalid, seek again.
|
||||
binary.BigEndian.PutUint64(cKey[:], lm.Time)
|
||||
_, _ = c.Seek(cKey[:8])
|
||||
key, value = c.Next()
|
||||
|
||||
// 3. Re-apply all other operations.
|
||||
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
|
||||
if err := t.logFromBytes(&tmp, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
|
||||
return err
|
||||
}
|
||||
key, value = c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
|
||||
currParent := b.Get(parentKey(key, op.Child))
|
||||
op.HasOld = currParent != nil
|
||||
if currParent != nil { // node is already in tree
|
||||
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
|
||||
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
op.HasOld = false
|
||||
op.Old = nodeInfo{}
|
||||
}
|
||||
|
||||
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error {
|
||||
binary.BigEndian.PutUint64(key, op.Time)
|
||||
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil {
|
||||
rawLog := t.logToBytes(op)
|
||||
if err := lb.Put(key[:8], rawLog); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
|
||||
return nil
|
||||
return t.redo(b, key, op, rawLog[16:])
|
||||
}
|
||||
|
||||
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error {
|
||||
var err error
|
||||
|
||||
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
|
||||
if inTree {
|
||||
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
|
||||
} else {
|
||||
ts = op.Time
|
||||
err = b.Delete(oldKey(key, op.Time))
|
||||
}
|
||||
|
||||
if currParent == nil {
|
||||
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
parent := binary.LittleEndian.Uint64(currParent)
|
||||
if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
|
||||
return err
|
||||
}
|
||||
|
||||
if inTree {
|
||||
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range op.Old.Meta.Items {
|
||||
if isAttributeInternal(op.Old.Meta.Items[i].Key) {
|
||||
key = internalKey(key, op.Old.Meta.Items[i].Key, string(op.Old.Meta.Items[i].Value), parent, op.Child)
|
||||
|
||||
var meta Meta
|
||||
if err := meta.FromBytes(currMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range meta.Items {
|
||||
if isAttributeInternal(meta.Items[i].Key) {
|
||||
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
|
||||
err := b.Delete(key)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -414,17 +504,16 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo
|
|||
}
|
||||
}
|
||||
}
|
||||
return t.addNode(b, key, op.Child, op.Parent, op.Meta)
|
||||
return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
|
||||
}
|
||||
|
||||
// removeNode removes node keys from the tree except the children key or its parent.
|
||||
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
|
||||
if err := b.Delete(parentKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
k := stateKey(key, node)
|
||||
_, _, rawMeta, _ := t.getState(b, k)
|
||||
|
||||
var meta Meta
|
||||
var k = metaKey(key, node)
|
||||
if err := meta.FromBytes(b.Get(k)); err == nil {
|
||||
if err := meta.FromBytes(rawMeta); err == nil {
|
||||
for i := range meta.Items {
|
||||
if isAttributeInternal(meta.Items[i].Key) {
|
||||
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
|
||||
|
@ -434,23 +523,16 @@ func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node)
|
|||
}
|
||||
}
|
||||
}
|
||||
if err := b.Delete(metaKey(key, node)); err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Delete(timestampKey(key, node))
|
||||
return b.Delete(k)
|
||||
}
|
||||
|
||||
// addNode adds node keys to the tree except the timestamp key.
|
||||
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error {
|
||||
err := b.Put(parentKey(key, child), toUint64(parent))
|
||||
if err != nil {
|
||||
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
|
||||
if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(childrenKey(key, child, parent), []byte{1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = b.Put(metaKey(key, child), meta.Bytes())
|
||||
|
||||
err := b.Put(childrenKey(key, child, parent), []byte{1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -473,27 +555,33 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
|
||||
func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error {
|
||||
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !lm.HasOld {
|
||||
parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
|
||||
if !ok {
|
||||
return t.removeNode(b, key, m.Child, m.Parent)
|
||||
}
|
||||
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
|
||||
|
||||
var meta Meta
|
||||
if err := meta.FromBytes(rawMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
|
||||
}
|
||||
|
||||
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
|
||||
key := make([]byte, 9)
|
||||
key[0] = 'p'
|
||||
key[0] = 's'
|
||||
for node := child; node != parent; {
|
||||
binary.LittleEndian.PutUint64(key[1:], node)
|
||||
rawParent := b.Get(key)
|
||||
if len(rawParent) != 8 {
|
||||
parent, _, _, ok := t.getState(b, key)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
node = binary.LittleEndian.Uint64(rawParent)
|
||||
node = parent
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
@ -508,6 +596,13 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
var nodes []Node
|
||||
|
||||
return nodes, t.db.View(func(tx *bbolt.Tx) error {
|
||||
|
@ -526,10 +621,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
childID [9]byte
|
||||
maxTimestamp uint64
|
||||
)
|
||||
var maxTimestamp uint64
|
||||
|
||||
c := b.Cursor()
|
||||
|
||||
|
@ -539,7 +631,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
|
||||
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
|
||||
if latest {
|
||||
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child)))
|
||||
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
|
||||
if ts >= maxTimestamp {
|
||||
nodes = append(nodes[:0], child)
|
||||
maxTimestamp = ts
|
||||
|
@ -555,7 +647,14 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
|
|||
|
||||
// TreeGetMeta implements the forest interface.
|
||||
func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
|
||||
key := parentKey(make([]byte, 9), nodeID)
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return Meta{}, 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := stateKey(make([]byte, 9), nodeID)
|
||||
|
||||
var m Meta
|
||||
var parentID uint64
|
||||
|
@ -567,10 +666,11 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
|
|||
}
|
||||
|
||||
b := treeRoot.Bucket(dataBucket)
|
||||
if data := b.Get(key); len(data) == 8 {
|
||||
if data := b.Get(key); len(data) != 0 {
|
||||
parentID = binary.LittleEndian.Uint64(data)
|
||||
}
|
||||
return m.FromBytes(b.Get(metaKey(key, nodeID)))
|
||||
_, _, meta, _ := t.getState(b, stateKey(key, nodeID))
|
||||
return m.FromBytes(meta)
|
||||
})
|
||||
|
||||
return m, parentID, err
|
||||
|
@ -578,6 +678,13 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
|
|||
|
||||
// TreeGetChildren implements the Forest interface.
|
||||
func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := make([]byte, 9)
|
||||
key[0] = 'c'
|
||||
binary.LittleEndian.PutUint64(key[1:], nodeID)
|
||||
|
@ -603,8 +710,17 @@ func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node)
|
|||
|
||||
// TreeList implements the Forest interface.
|
||||
func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
var ids []string
|
||||
cidRaw := []byte(cid.EncodeToString())
|
||||
cidRaw := make([]byte, 32)
|
||||
cid.Encode(cidRaw)
|
||||
|
||||
cidLen := len(cidRaw)
|
||||
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
|
@ -628,6 +744,13 @@ func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
|
|||
|
||||
// TreeGetOpLog implements the pilorama.Forest interface.
|
||||
func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return Move{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(key, height)
|
||||
|
||||
|
@ -651,10 +774,20 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (
|
|||
|
||||
// TreeDrop implements the pilorama.Forest interface.
|
||||
func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
} else if t.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
if treeID == "" {
|
||||
c := tx.Cursor()
|
||||
prefix := []byte(cid.EncodeToString())
|
||||
prefix := make([]byte, 32)
|
||||
cid.Encode(prefix)
|
||||
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
|
||||
err := tx.DeleteBucket(k)
|
||||
if err != nil {
|
||||
|
@ -698,67 +831,72 @@ loop:
|
|||
}
|
||||
|
||||
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
|
||||
r := io.NewBinReaderFromBuf(data)
|
||||
m.Child = r.ReadU64LE()
|
||||
m.Parent = r.ReadU64LE()
|
||||
m.Meta.DecodeBinary(r)
|
||||
return r.Err
|
||||
return t.logFromBytes(m, data)
|
||||
}
|
||||
|
||||
func (t *boltForest) logFromBytes(lm *LogMove, r *io.BinReader) error {
|
||||
lm.Child = r.ReadU64LE()
|
||||
lm.Parent = r.ReadU64LE()
|
||||
lm.Meta.DecodeBinary(r)
|
||||
lm.HasOld = r.ReadBool()
|
||||
if lm.HasOld {
|
||||
lm.Old.Parent = r.ReadU64LE()
|
||||
lm.Old.Meta.DecodeBinary(r)
|
||||
}
|
||||
return r.Err
|
||||
func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
|
||||
lm.Child = binary.LittleEndian.Uint64(data)
|
||||
lm.Parent = binary.LittleEndian.Uint64(data[8:])
|
||||
return lm.Meta.FromBytes(data[16:])
|
||||
}
|
||||
|
||||
func (t *boltForest) logToBytes(lm *LogMove) []byte {
|
||||
func (t *boltForest) logToBytes(lm *Move) []byte {
|
||||
w := io.NewBufBinWriter()
|
||||
size := 8 + 8 + lm.Meta.Size() + 1
|
||||
if lm.HasOld {
|
||||
size += 8 + lm.Old.Meta.Size()
|
||||
}
|
||||
//if lm.HasOld {
|
||||
// size += 8 + lm.Old.Meta.Size()
|
||||
//}
|
||||
|
||||
w.Grow(size)
|
||||
w.WriteU64LE(lm.Child)
|
||||
w.WriteU64LE(lm.Parent)
|
||||
lm.Meta.EncodeBinary(w.BinWriter)
|
||||
w.WriteBool(lm.HasOld)
|
||||
if lm.HasOld {
|
||||
w.WriteU64LE(lm.Old.Parent)
|
||||
lm.Old.Meta.EncodeBinary(w.BinWriter)
|
||||
}
|
||||
//w.WriteBool(lm.HasOld)
|
||||
//if lm.HasOld {
|
||||
// w.WriteU64LE(lm.Old.Parent)
|
||||
// lm.Old.Meta.EncodeBinary(w.BinWriter)
|
||||
//}
|
||||
return w.Bytes()
|
||||
}
|
||||
|
||||
func bucketName(cid cidSDK.ID, treeID string) []byte {
|
||||
return []byte(cid.String() + treeID)
|
||||
treeRoot := make([]byte, 32+len(treeID))
|
||||
cid.Encode(treeRoot)
|
||||
copy(treeRoot[32:], treeID)
|
||||
return treeRoot
|
||||
}
|
||||
|
||||
// 't' + node (id) -> timestamp when the node first appeared.
|
||||
func timestampKey(key []byte, child Node) []byte {
|
||||
key[0] = 't'
|
||||
// 'o' + time -> old meta.
|
||||
func oldKey(key []byte, ts Timestamp) []byte {
|
||||
key[0] = 'o'
|
||||
binary.LittleEndian.PutUint64(key[1:], ts)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 's' + child ID -> parent + timestamp of the first appearance + meta.
|
||||
func stateKey(key []byte, child Node) []byte {
|
||||
key[0] = 's'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
}
|
||||
|
||||
// 'p' + node (id) -> parent (id).
|
||||
func parentKey(key []byte, child Node) []byte {
|
||||
key[0] = 'p'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
|
||||
data := make([]byte, len(meta)+8+8)
|
||||
binary.LittleEndian.PutUint64(data, parent)
|
||||
binary.LittleEndian.PutUint64(data[8:], timestamp)
|
||||
copy(data[16:], meta)
|
||||
return b.Put(key, data)
|
||||
}
|
||||
|
||||
// 'm' + node (id) -> serialized meta.
|
||||
func metaKey(key []byte, child Node) []byte {
|
||||
key[0] = 'm'
|
||||
binary.LittleEndian.PutUint64(key[1:], child)
|
||||
return key[:9]
|
||||
func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
|
||||
data := b.Get(key)
|
||||
if data == nil {
|
||||
return 0, 0, nil, false
|
||||
}
|
||||
|
||||
parent := binary.LittleEndian.Uint64(data)
|
||||
timestamp := binary.LittleEndian.Uint64(data[8:])
|
||||
return parent, timestamp, data[16:], true
|
||||
}
|
||||
|
||||
// 'c' + parent (id) + child (id) -> 0/1.
|
||||
|
|
|
@ -25,7 +25,7 @@ func NewMemoryForest() ForestStorage {
|
|||
}
|
||||
|
||||
// TreeMove implements the Forest interface.
|
||||
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogMove, error) {
|
||||
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*Move, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
@ -44,11 +44,11 @@ func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogM
|
|||
|
||||
lm := s.do(op)
|
||||
s.operations = append(s.operations, lm)
|
||||
return &lm, nil
|
||||
return &lm.Move, nil
|
||||
}
|
||||
|
||||
// TreeAddByPath implements the Forest interface.
|
||||
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
|
||||
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]Move, error) {
|
||||
if !d.checkValid() {
|
||||
return nil, ErrInvalidCIDDescriptor
|
||||
}
|
||||
|
@ -64,22 +64,23 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
|||
}
|
||||
|
||||
i, node := s.getPathPrefix(attr, path)
|
||||
lm := make([]LogMove, len(path)-i+1)
|
||||
lm := make([]Move, len(path)-i+1)
|
||||
for j := i; j < len(path); j++ {
|
||||
lm[j-i] = s.do(&Move{
|
||||
op := s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
node = lm[j-i].Child
|
||||
s.operations = append(s.operations, lm[j-i])
|
||||
lm[j-i] = op.Move
|
||||
node = op.Child
|
||||
s.operations = append(s.operations, op)
|
||||
}
|
||||
|
||||
mCopy := make([]KeyValue, len(m))
|
||||
copy(mCopy, m)
|
||||
lm[len(lm)-1] = s.do(&Move{
|
||||
op := s.do(&Move{
|
||||
Parent: node,
|
||||
Meta: Meta{
|
||||
Time: s.timestamp(d.Position, d.Size),
|
||||
|
@ -87,6 +88,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
|
|||
},
|
||||
Child: s.findSpareID(),
|
||||
})
|
||||
lm[len(lm)-1] = op.Move
|
||||
return lm, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -16,9 +17,9 @@ import (
|
|||
|
||||
var providers = []struct {
|
||||
name string
|
||||
construct func(t testing.TB) Forest
|
||||
construct func(t testing.TB, opts ...Option) Forest
|
||||
}{
|
||||
{"inmemory", func(t testing.TB) Forest {
|
||||
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
||||
f := NewMemoryForest()
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Init())
|
||||
|
@ -28,14 +29,15 @@ var providers = []struct {
|
|||
|
||||
return f
|
||||
}},
|
||||
{"bbolt", func(t testing.TB) Forest {
|
||||
{"bbolt", func(t testing.TB, opts ...Option) Forest {
|
||||
// Use `os.TempDir` because we construct multiple times in the same test.
|
||||
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
|
||||
require.NoError(t, err)
|
||||
|
||||
f := NewBoltForest(
|
||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||
WithMaxBatchSize(1))
|
||||
append([]Option{
|
||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||
WithMaxBatchSize(1)}, opts...)...)
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
|
@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) {
|
||||
s := constructor(t)
|
||||
|
||||
checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) {
|
||||
|
@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
rand.Seed(42)
|
||||
|
||||
const (
|
||||
nodeCount = 5
|
||||
opCount = 20
|
||||
iterCount = 200
|
||||
)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
expected := constructor(t)
|
||||
func TestForest_ParallelApply(t *testing.T) {
|
||||
for i := range providers {
|
||||
if providers[i].name == "inmemory" {
|
||||
continue
|
||||
}
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// prepareRandomTree creates a random sequence of operation and applies them to tree.
|
||||
// The operations are guaranteed to be applied and returned sorted by `Time`.
|
||||
func prepareRandomTree(nodeCount, opCount int) []Move {
|
||||
ops := make([]Move, nodeCount+opCount)
|
||||
for i := 0; i < nodeCount; i++ {
|
||||
ops[i] = Move{
|
||||
|
@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
|
||||
for i := nodeCount; i < len(ops); i++ {
|
||||
ops[i] = Move{
|
||||
Parent: rand.Uint64() % (nodeCount + 12),
|
||||
Parent: rand.Uint64() % uint64(nodeCount+12),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i + nodeCount),
|
||||
Items: []KeyValue{
|
||||
|
@ -694,17 +696,111 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
{Value: make([]byte, 10)},
|
||||
},
|
||||
},
|
||||
Child: rand.Uint64() % (nodeCount + 10),
|
||||
Child: rand.Uint64() % uint64(nodeCount+10),
|
||||
}
|
||||
if rand.Uint32()%5 == 0 {
|
||||
ops[i].Parent = TrashID
|
||||
}
|
||||
rand.Read(ops[i].Meta.Items[1].Value)
|
||||
}
|
||||
|
||||
return ops
|
||||
}
|
||||
|
||||
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
|
||||
for i := uint64(0); i < uint64(nodeCount); i++ {
|
||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedParent, actualParent, "node id: %d", i)
|
||||
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
|
||||
|
||||
if ma, ok := actual.(*memoryForest); ok {
|
||||
me := expected.(*memoryForest)
|
||||
require.Equal(t, len(me.treeMap), len(ma.treeMap))
|
||||
|
||||
for k, sa := range ma.treeMap {
|
||||
se, ok := me.treeMap[k]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, se.operations, sa.operations)
|
||||
require.Equal(t, se.infoMap, sa.infoMap)
|
||||
|
||||
require.Equal(t, len(se.childMap), len(sa.childMap))
|
||||
for ck, la := range sa.childMap {
|
||||
le, ok := se.childMap[ck]
|
||||
require.True(t, ok)
|
||||
require.ElementsMatch(t, le, la)
|
||||
}
|
||||
}
|
||||
require.Equal(t, expected, actual, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) {
|
||||
rand.Seed(42)
|
||||
|
||||
const nodeCount = 5
|
||||
|
||||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
|
||||
|
||||
actual := constructor(t, WithMaxBatchSize(batchSize))
|
||||
wg := new(sync.WaitGroup)
|
||||
ch := make(chan *Move, 0)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for op := range ch {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, op, false))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i := range ops {
|
||||
ch <- &ops[i]
|
||||
}
|
||||
close(ch)
|
||||
wg.Wait()
|
||||
|
||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
rand.Seed(42)
|
||||
|
||||
const (
|
||||
nodeCount = 5
|
||||
opCount = 20
|
||||
)
|
||||
|
||||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
const iterCount = 200
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
|
||||
|
@ -713,59 +809,39 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
for i := uint64(0); i < nodeCount; i++ {
|
||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expectedParent, actualParent, "node id: %d", i)
|
||||
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
|
||||
|
||||
if ma, ok := actual.(*memoryForest); ok {
|
||||
me := expected.(*memoryForest)
|
||||
require.Equal(t, len(me.treeMap), len(ma.treeMap))
|
||||
|
||||
for k, sa := range ma.treeMap {
|
||||
se, ok := me.treeMap[k]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, se.operations, sa.operations)
|
||||
require.Equal(t, se.infoMap, sa.infoMap)
|
||||
|
||||
require.Equal(t, len(se.childMap), len(sa.childMap))
|
||||
for ck, la := range sa.childMap {
|
||||
le, ok := se.childMap[ck]
|
||||
require.True(t, ok)
|
||||
require.ElementsMatch(t, le, la)
|
||||
}
|
||||
}
|
||||
require.Equal(t, expected, actual, i)
|
||||
}
|
||||
}
|
||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||
}
|
||||
}
|
||||
|
||||
const benchNodeCount = 1000
|
||||
|
||||
var batchSizes = []int{1, 2, 4, 8, 16, 32}
|
||||
|
||||
func BenchmarkApplySequential(b *testing.B) {
|
||||
for i := range providers {
|
||||
if providers[i].name == "inmemory" { // memory backend is not thread-safe
|
||||
continue
|
||||
}
|
||||
b.Run(providers[i].name, func(b *testing.B) {
|
||||
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(benchNodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(benchNodeCount)),
|
||||
}
|
||||
}
|
||||
return ops
|
||||
})
|
||||
for _, bs := range batchSizes {
|
||||
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
|
||||
s := providers[i].construct(b, WithMaxBatchSize(bs))
|
||||
benchmarkApply(b, s, func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(benchNodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(benchNodeCount)),
|
||||
}
|
||||
}
|
||||
return ops
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -780,25 +856,30 @@ func BenchmarkApplyReorderLast(b *testing.B) {
|
|||
continue
|
||||
}
|
||||
b.Run(providers[i].name, func(b *testing.B) {
|
||||
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(benchNodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(benchNodeCount)),
|
||||
}
|
||||
if i != 0 && i%blockSize == 0 {
|
||||
for j := 0; j < blockSize/2; j++ {
|
||||
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
|
||||
for _, bs := range batchSizes {
|
||||
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
|
||||
s := providers[i].construct(b, WithMaxBatchSize(bs))
|
||||
benchmarkApply(b, s, func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
Parent: uint64(rand.Intn(benchNodeCount)),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i),
|
||||
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
|
||||
},
|
||||
Child: uint64(rand.Intn(benchNodeCount)),
|
||||
}
|
||||
if i != 0 && i%blockSize == 0 {
|
||||
for j := 0; j < blockSize/2; j++ {
|
||||
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ops
|
||||
})
|
||||
return ops
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
ch := make(chan *Move, b.N)
|
||||
for i := range ops {
|
||||
ch <- &ops[i]
|
||||
ch := make(chan int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ch <- i
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetParallelism(50)
|
||||
b.SetParallelism(10)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
op := <-ch
|
||||
if err := s.TreeApply(d, treeID, op, false); err != nil {
|
||||
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
|
||||
b.Fatalf("error in `Apply`: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,15 @@ type nodeInfo struct {
|
|||
Meta Meta
|
||||
}
|
||||
|
||||
type move struct {
|
||||
Move
|
||||
HasOld bool
|
||||
Old nodeInfo
|
||||
}
|
||||
|
||||
// state represents state being replicated.
|
||||
type state struct {
|
||||
operations []LogMove
|
||||
operations []move
|
||||
tree
|
||||
}
|
||||
|
||||
|
@ -20,7 +26,7 @@ func newState() *state {
|
|||
}
|
||||
|
||||
// undo un-does op and changes s in-place.
|
||||
func (s *state) undo(op *LogMove) {
|
||||
func (s *state) undo(op *move) {
|
||||
children := s.tree.childMap[op.Parent]
|
||||
for i := range children {
|
||||
if children[i] == op.Child {
|
||||
|
@ -76,8 +82,8 @@ func (s *state) Apply(op *Move) error {
|
|||
}
|
||||
|
||||
// do performs a single move operation on a tree.
|
||||
func (s *state) do(op *Move) LogMove {
|
||||
lm := LogMove{
|
||||
func (s *state) do(op *Move) move {
|
||||
lm := move{
|
||||
Move: Move{
|
||||
Parent: op.Parent,
|
||||
Meta: op.Meta,
|
||||
|
|
|
@ -11,11 +11,11 @@ type Forest interface {
|
|||
// TreeMove moves node in the tree.
|
||||
// If the parent of the move operation is TrashID, the node is removed.
|
||||
// If the child of the move operation is RootID, new ID is generated and added to a tree.
|
||||
TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error)
|
||||
TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error)
|
||||
// TreeAddByPath adds new node in the tree using provided path.
|
||||
// The path is constructed by descending from the root using the values of the attr in meta.
|
||||
// Internal nodes in path should have exactly one attribute, otherwise a new node is created.
|
||||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
|
||||
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
|
||||
// TreeApply applies replicated operation from another node.
|
||||
// If background is true, TreeApply will first check whether an operation exists.
|
||||
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error
|
||||
|
|
|
@ -35,13 +35,6 @@ type Move struct {
|
|||
Child Node
|
||||
}
|
||||
|
||||
// LogMove represents log record for a single move operation.
|
||||
type LogMove struct {
|
||||
Move
|
||||
HasOld bool
|
||||
Old nodeInfo
|
||||
}
|
||||
|
||||
const (
|
||||
// RootID represents the ID of a root node.
|
||||
RootID = 0
|
||||
|
|
|
@ -261,7 +261,10 @@ func (s *Shard) Close() error {
|
|||
}
|
||||
}
|
||||
|
||||
s.gc.stop()
|
||||
// If Init/Open was unsuccessful gc can be nil.
|
||||
if s.gc != nil {
|
||||
s.gc.stop()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -91,11 +91,15 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
|||
}, err
|
||||
}
|
||||
|
||||
// emptyStorageID is an empty storageID that indicates that
|
||||
// an object is big (and is stored in an FSTree, not in a blobovnicza).
|
||||
var emptyStorageID = make([]byte, 0)
|
||||
|
||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
||||
var (
|
||||
err error
|
||||
res *objectSDK.Object
|
||||
mErr error
|
||||
mRes meta.ExistsRes
|
||||
)
|
||||
|
||||
var exists bool
|
||||
|
@ -103,15 +107,15 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
|
|||
var mPrm meta.ExistsPrm
|
||||
mPrm.SetAddress(addr)
|
||||
|
||||
mRes, err := s.metaBase.Exists(mPrm)
|
||||
if err != nil && !s.info.Mode.NoMetabase() {
|
||||
return res, false, err
|
||||
mRes, mErr = s.metaBase.Exists(mPrm)
|
||||
if mErr != nil && !s.info.Mode.NoMetabase() {
|
||||
return nil, false, mErr
|
||||
}
|
||||
exists = mRes.Exists()
|
||||
}
|
||||
|
||||
if s.hasWriteCache() {
|
||||
res, err = wc(s.writeCache)
|
||||
res, err := wc(s.writeCache)
|
||||
if err == nil || IsErrOutOfRange(err) {
|
||||
return res, false, err
|
||||
}
|
||||
|
@ -123,8 +127,8 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
|
|||
}
|
||||
}
|
||||
|
||||
if skipMeta || err != nil {
|
||||
res, err = cb(s.blobStor, nil)
|
||||
if skipMeta || mErr != nil {
|
||||
res, err := cb(s.blobStor, nil)
|
||||
return res, false, err
|
||||
}
|
||||
|
||||
|
@ -135,12 +139,20 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
|
|||
var mPrm meta.StorageIDPrm
|
||||
mPrm.SetAddress(addr)
|
||||
|
||||
mRes, err := s.metaBase.StorageID(mPrm)
|
||||
mExRes, err := s.metaBase.StorageID(mPrm)
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
}
|
||||
|
||||
res, err = cb(s.blobStor, mRes.StorageID())
|
||||
storageID := mExRes.StorageID()
|
||||
if storageID == nil {
|
||||
// `nil` storageID returned without any error
|
||||
// means that object is big, `cb` expects an
|
||||
// empty (but non-nil) storageID in such cases
|
||||
storageID = emptyStorageID
|
||||
}
|
||||
|
||||
res, err := cb(s.blobStor, storageID)
|
||||
|
||||
return res, true, err
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
|||
import (
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ErrReadOnlyMode is returned when it is impossible to apply operation
|
||||
|
@ -24,6 +25,10 @@ func (s *Shard) SetMode(m mode.Mode) error {
|
|||
}
|
||||
|
||||
func (s *Shard) setMode(m mode.Mode) error {
|
||||
s.log.Info("setting shard mode",
|
||||
zap.Stringer("old_mode", s.info.Mode),
|
||||
zap.Stringer("new_mode", m))
|
||||
|
||||
components := []interface{ SetMode(mode.Mode) error }{
|
||||
s.metaBase, s.blobStor,
|
||||
}
|
||||
|
@ -61,6 +66,8 @@ func (s *Shard) setMode(m mode.Mode) error {
|
|||
s.metricsWriter.SetReadonly(s.info.Mode != mode.ReadWrite)
|
||||
}
|
||||
|
||||
s.log.Info("shard mode set successfully",
|
||||
zap.Stringer("mode", s.info.Mode))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ var _ pilorama.Forest = (*Shard)(nil)
|
|||
var ErrPiloramaDisabled = logicerr.New("pilorama is disabled")
|
||||
|
||||
// TreeMove implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
|
||||
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
|
||||
if s.pilorama == nil {
|
||||
return nil, ErrPiloramaDisabled
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
|
|||
}
|
||||
|
||||
// TreeAddByPath implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) {
|
||||
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.Move, error) {
|
||||
if s.pilorama == nil {
|
||||
return nil, ErrPiloramaDisabled
|
||||
}
|
||||
|
|
28
pkg/network/cache/multi.go
vendored
28
pkg/network/cache/multi.go
vendored
|
@ -11,6 +11,7 @@ import (
|
|||
clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
type singleClient struct {
|
||||
|
@ -29,8 +30,6 @@ type multiClient struct {
|
|||
addr network.AddressGroup
|
||||
|
||||
opts ClientCacheOpts
|
||||
|
||||
reconnectInterval time.Duration
|
||||
}
|
||||
|
||||
const defaultReconnectInterval = time.Second * 30
|
||||
|
@ -40,10 +39,9 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien
|
|||
opts.ReconnectTimeout = defaultReconnectInterval
|
||||
}
|
||||
return &multiClient{
|
||||
clients: make(map[string]*singleClient),
|
||||
addr: addr,
|
||||
opts: opts,
|
||||
reconnectInterval: defaultReconnectInterval,
|
||||
clients: make(map[string]*singleClient),
|
||||
addr: addr,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,8 +147,12 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
|||
err = f(c)
|
||||
}
|
||||
|
||||
success := err == nil || errors.Is(err, context.Canceled)
|
||||
// non-status logic error that could be returned
|
||||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *object.SplitInfoError
|
||||
|
||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
|
||||
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
||||
firstErr = err
|
||||
}
|
||||
|
@ -170,6 +172,14 @@ func (x *multiClient) ReportError(err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// non-status logic error that could be returned
|
||||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *object.SplitInfoError
|
||||
if errors.As(err, &siErr) {
|
||||
return
|
||||
}
|
||||
|
||||
// Dropping all clients here is not necessary, we do this
|
||||
// because `multiClient` doesn't yet provide convenient interface
|
||||
// for reporting individual errors for streaming operations.
|
||||
|
@ -327,7 +337,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
|
|||
c.RUnlock()
|
||||
return cl, nil
|
||||
}
|
||||
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
|
||||
if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
|
||||
c.RUnlock()
|
||||
return nil, errRecentlyFailed
|
||||
}
|
||||
|
@ -350,7 +360,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
|
|||
return c.client, nil
|
||||
}
|
||||
|
||||
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
|
||||
if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
|
||||
return nil, errRecentlyFailed
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -573,6 +574,9 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
|
|||
if err != nil {
|
||||
return info, errors.New("can't fetch current epoch")
|
||||
}
|
||||
if req.token.ExpiredAt(currentEpoch) {
|
||||
return info, apistatus.SessionTokenExpired{}
|
||||
}
|
||||
if req.token.InvalidAt(currentEpoch) {
|
||||
return info, fmt.Errorf("%s: token is invalid at %d epoch)",
|
||||
invalidRequestMessage, currentEpoch)
|
||||
|
|
|
@ -6,9 +6,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// LogServiceError writes debug error message of object service to provided logger.
|
||||
// LogServiceError writes error message of object service to provided logger.
|
||||
func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, err error) {
|
||||
l.Debug("object service error",
|
||||
l.Error("object service error",
|
||||
zap.String("node", network.StringifyGroup(node)),
|
||||
zap.String("request", req),
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -17,7 +17,7 @@ func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, er
|
|||
|
||||
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
|
||||
func LogWorkerPoolError(l *logger.Logger, req string, err error) {
|
||||
l.Debug("could not push task to worker pool",
|
||||
l.Error("could not push task to worker pool",
|
||||
zap.String("request", req),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
type movePair struct {
|
||||
cid cidSDK.ID
|
||||
treeID string
|
||||
op *pilorama.LogMove
|
||||
op *pilorama.Move
|
||||
}
|
||||
|
||||
type replicationTask struct {
|
||||
|
@ -25,12 +25,33 @@ type replicationTask struct {
|
|||
req *ApplyRequest
|
||||
}
|
||||
|
||||
type applyOp struct {
|
||||
treeID string
|
||||
pilorama.CIDDescriptor
|
||||
pilorama.Move
|
||||
}
|
||||
|
||||
const (
|
||||
defaultReplicatorCapacity = 64
|
||||
defaultReplicatorWorkerCount = 64
|
||||
defaultReplicatorSendTimeout = time.Second * 5
|
||||
)
|
||||
|
||||
func (s *Service) localReplicationWorker() {
|
||||
for {
|
||||
select {
|
||||
case <-s.closeCh:
|
||||
return
|
||||
case op := <-s.replicateLocalCh:
|
||||
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
|
||||
if err != nil {
|
||||
s.log.Error("failed to apply replicated operation",
|
||||
zap.String("err", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) replicationWorker() {
|
||||
for {
|
||||
select {
|
||||
|
@ -74,6 +95,7 @@ func (s *Service) replicationWorker() {
|
|||
func (s *Service) replicateLoop(ctx context.Context) {
|
||||
for i := 0; i < s.replicatorWorkerCount; i++ {
|
||||
go s.replicationWorker()
|
||||
go s.localReplicationWorker()
|
||||
}
|
||||
defer func() {
|
||||
for len(s.replicationTasks) != 0 {
|
||||
|
@ -119,14 +141,14 @@ func (s *Service) replicate(op movePair) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove) {
|
||||
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
||||
select {
|
||||
case s.replicateCh <- movePair{
|
||||
cid: cid,
|
||||
treeID: treeID,
|
||||
op: op,
|
||||
}:
|
||||
case <-s.closeCh:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ type Service struct {
|
|||
|
||||
cache clientCache
|
||||
replicateCh chan movePair
|
||||
replicateLocalCh chan applyOp
|
||||
replicationTasks chan replicationTask
|
||||
closeCh chan struct{}
|
||||
containerCache containerCache
|
||||
|
@ -59,6 +60,7 @@ func New(opts ...Option) *Service {
|
|||
s.cache.init()
|
||||
s.closeCh = make(chan struct{})
|
||||
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
|
||||
s.replicateLocalCh = make(chan applyOp)
|
||||
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
|
||||
s.containerCache.init(s.containerCacheSize)
|
||||
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
|
||||
|
@ -483,13 +485,19 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
|||
return nil, fmt.Errorf("can't parse meta-information: %w", err)
|
||||
}
|
||||
|
||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
|
||||
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
|
||||
Parent: op.GetParentId(),
|
||||
Child: op.GetChildId(),
|
||||
Meta: meta,
|
||||
}, false)
|
||||
select {
|
||||
case s.replicateLocalCh <- applyOp{
|
||||
treeID: req.GetBody().GetTreeId(),
|
||||
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
|
||||
Move: pilorama.Move{
|
||||
Parent: op.GetParentId(),
|
||||
Child: op.GetChildId(),
|
||||
Meta: meta,
|
||||
},
|
||||
}:
|
||||
default:
|
||||
}
|
||||
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
||||
}
|
||||
|
||||
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {
|
||||
|
|
|
@ -360,7 +360,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
}
|
||||
|
||||
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
|
||||
// It is assumed that 0 <= pos < len(nodes)
|
||||
// It is assumed that 0 <= pos < len(nodes).
|
||||
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
|
||||
if len(cnrNodes) == 1 {
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue