Move diff from nspcc master and support branches #28

Merged
fyrchik merged 28 commits from move-changes into master 2023-01-25 12:31:47 +00:00
50 changed files with 845 additions and 469 deletions

View file

@ -4,8 +4,19 @@ Changelog for FrostFS Node
## [Unreleased] ## [Unreleased]
### Added ### Added
- Separate batching for replicated operations over the same container in pilorama (#1621)
- Doc for extended headers (#2128)
### Changed ### Changed
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
### Fixed ### Fixed
- Big object removal with non-local parts (#1978)
- Disable pilorama when moving to degraded mode (#2197)
- Fetching blobovnicza objects that not found in write-cache (#2206)
- Do not search for the small objects in FSTree (#2206)
- Correct status error for expired session token (#2207)
### Removed ### Removed
### Updated ### Updated
- `neo-go` to `v0.100.1` - `neo-go` to `v0.100.1`
@ -47,7 +58,8 @@ Changelog for FrostFS Node
- Use `sync.Pool` in Object.PUT service (#2139) - Use `sync.Pool` in Object.PUT service (#2139)
- Shard uses metabase for `HEAD` requests by default, not write-cache (#2167) - Shard uses metabase for `HEAD` requests by default, not write-cache (#2167)
- Clarify help for `--expire-at` parameter for commands `object lock/put` and `bearer create` (#2097) - Clarify help for `--expire-at` parameter for commands `object lock/put` and `bearer create` (#2097)
- Node spawns `GETRANGE` requests signed with the node's key if session key was not found for `RANGEHASH` (#2144) - Node spawns `GETRANGE` requests signed with the node's key if session key was not found for `RANGEHASH` (#2144)
- Full list of container is no longer cached (#2176)
### Fixed ### Fixed
- Open FSTree in sync mode by default (#1992) - Open FSTree in sync mode by default (#1992)

View file

@ -0,0 +1,34 @@
# Extended headers
## Overview
Extended headers are used for request/response. They may contain any user-defined headers
to be interpreted on application level.
Key name must be a unique valid UTF-8 string. Value can't be empty. Requests or
Responses with duplicated header names or headers with empty values are
considered invalid.
## Existing headers
There are some "well-known" headers starting with `__NEOFS__` prefix that
affect system behaviour:
* `__NEOFS__NETMAP_EPOCH` - netmap epoch to use for object placement calculation. The `value` is string
encoded `uint64` in decimal presentation. If set to '0' or omitted, the
current epoch only will be used.
* `__NEOFS__NETMAP_LOOKUP_DEPTH` - if object can't be found using current epoch's netmap, this header limits
how many past epochs the node can look up through. Depth is applied to a current epoch or the value
of `__NEOFS__NETMAP_EPOCH` attribute. The `value` is string encoded `uint64` in decimal presentation.
If set to '0' or not set, only the current epoch is used.
## `neofs-cli` commands with `--xhdr`
List of commands with support of extended headers:
* `container list-objects`
* `object delete/get/hash/head/lock/put/range/search`
* `storagegroup delete/get/list/put`
Example:
```shell
$ neofs-cli object put -r s01.neofs.devenv:8080 -w wallet.json --cid CID --file FILE --xhdr "__NEOFS__NETMAP_EPOCH=777"
```

View file

@ -21,25 +21,25 @@ var errInvalidEndpoint = errors.New("provided RPC endpoint is incorrect")
// GetSDKClientByFlag returns default frostfs-sdk-go client using the specified flag for the address. // GetSDKClientByFlag returns default frostfs-sdk-go client using the specified flag for the address.
// On error, outputs to stderr of cmd and exits with non-zero code. // On error, outputs to stderr of cmd and exits with non-zero code.
func GetSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) *client.Client { func GetSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) *client.Client {
cli, err := getSDKClientByFlag(key, endpointFlag) cli, err := getSDKClientByFlag(cmd, key, endpointFlag)
if err != nil { if err != nil {
common.ExitOnErr(cmd, "can't create API client: %w", err) common.ExitOnErr(cmd, "can't create API client: %w", err)
} }
return cli return cli
} }
func getSDKClientByFlag(key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) { func getSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
var addr network.Address var addr network.Address
err := addr.FromString(viper.GetString(endpointFlag)) err := addr.FromString(viper.GetString(endpointFlag))
if err != nil { if err != nil {
return nil, fmt.Errorf("%v: %w", errInvalidEndpoint, err) return nil, fmt.Errorf("%v: %w", errInvalidEndpoint, err)
} }
return GetSDKClient(key, addr) return GetSDKClient(cmd, key, addr)
} }
// GetSDKClient returns default frostfs-sdk-go client. // GetSDKClient returns default frostfs-sdk-go client.
func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) { func GetSDKClient(cmd *cobra.Command, key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
var ( var (
c client.Client c client.Client
prmInit client.PrmInit prmInit client.PrmInit
@ -56,7 +56,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
prmDial.SetTimeout(timeout) prmDial.SetTimeout(timeout)
prmDial.SetStreamTimeout(timeout) prmDial.SetStreamTimeout(timeout)
common.PrintVerbose("Set request timeout to %s.", timeout) common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
} }
c.Init(prmInit) c.Init(prmInit)
@ -69,7 +69,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
} }
// GetCurrentEpoch returns current epoch. // GetCurrentEpoch returns current epoch.
func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) { func GetCurrentEpoch(ctx context.Context, cmd *cobra.Command, endpoint string) (uint64, error) {
var addr network.Address var addr network.Address
if err := addr.FromString(endpoint); err != nil { if err := addr.FromString(endpoint); err != nil {
@ -81,7 +81,7 @@ func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) {
return 0, fmt.Errorf("can't generate key to sign query: %w", err) return 0, fmt.Errorf("can't generate key to sign query: %w", err)
} }
c, err := GetSDKClient(key, addr) c, err := GetSDKClient(cmd, key, addr)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View file

@ -19,7 +19,7 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
ExitOnErr(cmd, "", errors.New("incorrect path to file with EACL")) ExitOnErr(cmd, "", errors.New("incorrect path to file with EACL"))
} }
PrintVerbose("Reading EACL from file: %s", eaclPath) PrintVerbose(cmd, "Reading EACL from file: %s", eaclPath)
data, err := os.ReadFile(eaclPath) data, err := os.ReadFile(eaclPath)
ExitOnErr(cmd, "can't read file with EACL: %w", err) ExitOnErr(cmd, "can't read file with EACL: %w", err)
@ -28,13 +28,13 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
if err = table.UnmarshalJSON(data); err == nil { if err = table.UnmarshalJSON(data); err == nil {
validateAndFixEACLVersion(table) validateAndFixEACLVersion(table)
PrintVerbose("Parsed JSON encoded EACL table") PrintVerbose(cmd, "Parsed JSON encoded EACL table")
return table return table
} }
if err = table.Unmarshal(data); err == nil { if err = table.Unmarshal(data); err == nil {
validateAndFixEACLVersion(table) validateAndFixEACLVersion(table)
PrintVerbose("Parsed binary encoded EACL table") PrintVerbose(cmd, "Parsed binary encoded EACL table")
return table return table
} }

View file

@ -11,12 +11,12 @@ import (
func PrettyPrintJSON(cmd *cobra.Command, m json.Marshaler, entity string) { func PrettyPrintJSON(cmd *cobra.Command, m json.Marshaler, entity string) {
data, err := m.MarshalJSON() data, err := m.MarshalJSON()
if err != nil { if err != nil {
PrintVerbose("Can't convert %s to json: %w", entity, err) PrintVerbose(cmd, "Can't convert %s to json: %w", entity, err)
return return
} }
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil { if err := json.Indent(buf, data, "", " "); err != nil {
PrintVerbose("Can't pretty print json: %w", err) PrintVerbose(cmd, "Can't pretty print json: %w", err)
return return
} }
cmd.Println(buf) cmd.Println(buf)

View file

@ -19,11 +19,11 @@ func ReadBearerToken(cmd *cobra.Command, flagname string) *bearer.Token {
return nil return nil
} }
PrintVerbose("Reading bearer token from file [%s]...", path) PrintVerbose(cmd, "Reading bearer token from file [%s]...", path)
var tok bearer.Token var tok bearer.Token
err = ReadBinaryOrJSON(&tok, path) err = ReadBinaryOrJSON(cmd, &tok, path)
ExitOnErr(cmd, "invalid bearer token: %v", err) ExitOnErr(cmd, "invalid bearer token: %v", err)
return &tok return &tok
@ -38,8 +38,8 @@ type BinaryOrJSON interface {
// ReadBinaryOrJSON reads file data using provided path and decodes // ReadBinaryOrJSON reads file data using provided path and decodes
// BinaryOrJSON from the data. // BinaryOrJSON from the data.
func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error { func ReadBinaryOrJSON(cmd *cobra.Command, dst BinaryOrJSON, fPath string) error {
PrintVerbose("Reading file [%s]...", fPath) PrintVerbose(cmd, "Reading file [%s]...", fPath)
// try to read session token from file // try to read session token from file
data, err := os.ReadFile(fPath) data, err := os.ReadFile(fPath)
@ -47,17 +47,17 @@ func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error {
return fmt.Errorf("read file <%s>: %w", fPath, err) return fmt.Errorf("read file <%s>: %w", fPath, err)
} }
PrintVerbose("Trying to decode binary...") PrintVerbose(cmd, "Trying to decode binary...")
err = dst.Unmarshal(data) err = dst.Unmarshal(data)
if err != nil { if err != nil {
PrintVerbose("Failed to decode binary: %v", err) PrintVerbose(cmd, "Failed to decode binary: %v", err)
PrintVerbose("Trying to decode JSON...") PrintVerbose(cmd, "Trying to decode JSON...")
err = dst.UnmarshalJSON(data) err = dst.UnmarshalJSON(data)
if err != nil { if err != nil {
PrintVerbose("Failed to decode JSON: %v", err) PrintVerbose(cmd, "Failed to decode JSON: %v", err)
return errors.New("invalid format") return errors.New("invalid format")
} }
} }

View file

@ -2,7 +2,6 @@ package common
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
"strconv" "strconv"
"time" "time"
@ -13,9 +12,9 @@ import (
) )
// PrintVerbose prints to the stdout if the commonflags.Verbose flag is on. // PrintVerbose prints to the stdout if the commonflags.Verbose flag is on.
func PrintVerbose(format string, a ...interface{}) { func PrintVerbose(cmd *cobra.Command, format string, a ...interface{}) {
if viper.GetBool(commonflags.Verbose) { if viper.GetBool(commonflags.Verbose) {
fmt.Printf(format+"\n", a...) cmd.Printf(format+"\n", a...)
} }
} }

View file

@ -11,11 +11,18 @@ import (
"github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/term" "golang.org/x/term"
) )
var testCmd = &cobra.Command{
Use: "test",
Short: "test",
Run: func(cmd *cobra.Command, args []string) {},
}
func Test_getOrGenerate(t *testing.T) { func Test_getOrGenerate(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
@ -96,7 +103,7 @@ func Test_getOrGenerate(t *testing.T) {
t.Run("generate", func(t *testing.T) { t.Run("generate", func(t *testing.T) {
viper.Set(commonflags.GenerateKey, true) viper.Set(commonflags.GenerateKey, true)
actual, err := getOrGenerate() actual, err := getOrGenerate(testCmd)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, actual) require.NotNil(t, actual)
for _, p := range []*keys.PrivateKey{nep2Key, rawKey, wifKey, acc1.PrivateKey(), acc2.PrivateKey()} { for _, p := range []*keys.PrivateKey{nep2Key, rawKey, wifKey, acc1.PrivateKey(), acc2.PrivateKey()} {
@ -107,13 +114,13 @@ func Test_getOrGenerate(t *testing.T) {
func checkKeyError(t *testing.T, desc string, err error) { func checkKeyError(t *testing.T, desc string, err error) {
viper.Set(commonflags.WalletPath, desc) viper.Set(commonflags.WalletPath, desc)
_, actualErr := getOrGenerate() _, actualErr := getOrGenerate(testCmd)
require.ErrorIs(t, actualErr, err) require.ErrorIs(t, actualErr, err)
} }
func checkKey(t *testing.T, desc string, expected *keys.PrivateKey) { func checkKey(t *testing.T, desc string, expected *keys.PrivateKey) {
viper.Set(commonflags.WalletPath, desc) viper.Set(commonflags.WalletPath, desc)
actual, err := getOrGenerate() actual, err := getOrGenerate(testCmd)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &expected.PrivateKey, actual) require.Equal(t, &expected.PrivateKey, actual)
} }

View file

@ -20,12 +20,12 @@ var errCantGenerateKey = errors.New("can't generate new private key")
// Ideally we want to touch file-system on the last step. // Ideally we want to touch file-system on the last step.
// This function assumes that all flags were bind to viper in a `PersistentPreRun`. // This function assumes that all flags were bind to viper in a `PersistentPreRun`.
func Get(cmd *cobra.Command) *ecdsa.PrivateKey { func Get(cmd *cobra.Command) *ecdsa.PrivateKey {
pk, err := get() pk, err := get(cmd)
common.ExitOnErr(cmd, "can't fetch private key: %w", err) common.ExitOnErr(cmd, "can't fetch private key: %w", err)
return pk return pk
} }
func get() (*ecdsa.PrivateKey, error) { func get(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
keyDesc := viper.GetString(commonflags.WalletPath) keyDesc := viper.GetString(commonflags.WalletPath)
data, err := os.ReadFile(keyDesc) data, err := os.ReadFile(keyDesc)
if err != nil { if err != nil {
@ -36,7 +36,7 @@ func get() (*ecdsa.PrivateKey, error) {
if err != nil { if err != nil {
w, err := wallet.NewWalletFromFile(keyDesc) w, err := wallet.NewWalletFromFile(keyDesc)
if err == nil { if err == nil {
return FromWallet(w, viper.GetString(commonflags.Account)) return FromWallet(cmd, w, viper.GetString(commonflags.Account))
} }
return nil, fmt.Errorf("%w: %v", ErrInvalidKey, err) return nil, fmt.Errorf("%w: %v", ErrInvalidKey, err)
} }
@ -45,12 +45,12 @@ func get() (*ecdsa.PrivateKey, error) {
// GetOrGenerate is similar to get but generates a new key if commonflags.GenerateKey is set. // GetOrGenerate is similar to get but generates a new key if commonflags.GenerateKey is set.
func GetOrGenerate(cmd *cobra.Command) *ecdsa.PrivateKey { func GetOrGenerate(cmd *cobra.Command) *ecdsa.PrivateKey {
pk, err := getOrGenerate() pk, err := getOrGenerate(cmd)
common.ExitOnErr(cmd, "can't fetch private key: %w", err) common.ExitOnErr(cmd, "can't fetch private key: %w", err)
return pk return pk
} }
func getOrGenerate() (*ecdsa.PrivateKey, error) { func getOrGenerate(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
if viper.GetBool(commonflags.GenerateKey) { if viper.GetBool(commonflags.GenerateKey) {
priv, err := keys.NewPrivateKey() priv, err := keys.NewPrivateKey()
if err != nil { if err != nil {
@ -58,5 +58,5 @@ func getOrGenerate() (*ecdsa.PrivateKey, error) {
} }
return &priv.PrivateKey, nil return &priv.PrivateKey, nil
} }
return get() return get(cmd)
} }

View file

@ -3,13 +3,14 @@ package key
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"fmt"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -22,37 +23,37 @@ var (
) )
// FromWallet returns private key of the wallet account. // FromWallet returns private key of the wallet account.
func FromWallet(w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) { func FromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) {
var ( var (
addr util.Uint160 addr util.Uint160
err error err error
) )
if addrStr == "" { if addrStr == "" {
printVerbose("Using default wallet address") common.PrintVerbose(cmd, "Using default wallet address")
addr = w.GetChangeAddress() addr = w.GetChangeAddress()
} else { } else {
addr, err = flags.ParseAddress(addrStr) addr, err = flags.ParseAddress(addrStr)
if err != nil { if err != nil {
printVerbose("Can't parse address: %s", addrStr) common.PrintVerbose(cmd, "Can't parse address: %s", addrStr)
return nil, ErrInvalidAddress return nil, ErrInvalidAddress
} }
} }
acc := w.GetAccount(addr) acc := w.GetAccount(addr)
if acc == nil { if acc == nil {
printVerbose("Can't find wallet account for %s", addrStr) common.PrintVerbose(cmd, "Can't find wallet account for %s", addrStr)
return nil, ErrInvalidAddress return nil, ErrInvalidAddress
} }
pass, err := getPassword() pass, err := getPassword()
if err != nil { if err != nil {
printVerbose("Can't read password: %v", err) common.PrintVerbose(cmd, "Can't read password: %v", err)
return nil, ErrInvalidPassword return nil, ErrInvalidPassword
} }
if err := acc.Decrypt(pass, keys.NEP2ScryptParams()); err != nil { if err := acc.Decrypt(pass, keys.NEP2ScryptParams()); err != nil {
printVerbose("Can't decrypt account: %v", err) common.PrintVerbose(cmd, "Can't decrypt account: %v", err)
return nil, ErrInvalidPassword return nil, ErrInvalidPassword
} }
@ -67,9 +68,3 @@ func getPassword() (string, error) {
return input.ReadPassword("Enter password > ") return input.ReadPassword("Enter password > ")
} }
func printVerbose(format string, a ...interface{}) {
if viper.GetBool("verbose") {
fmt.Printf(format+"\n", a...)
}
}

View file

@ -71,7 +71,7 @@ func createToken(cmd *cobra.Command, _ []string) {
defer cancel() defer cancel()
endpoint, _ := cmd.Flags().GetString(commonflags.RPC) endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint) currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
common.ExitOnErr(cmd, "can't fetch current epoch: %w", err) common.ExitOnErr(cmd, "can't fetch current epoch: %w", err)
if iatRelative { if iatRelative {

View file

@ -36,7 +36,7 @@ var createContainerCmd = &cobra.Command{
Long: `Create new container and register it in the NeoFS. Long: `Create new container and register it in the NeoFS.
It will be stored in sidechain when inner ring will accepts it.`, It will be stored in sidechain when inner ring will accepts it.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
placementPolicy, err := parseContainerPolicy(containerPolicy) placementPolicy, err := parseContainerPolicy(cmd, containerPolicy)
common.ExitOnErr(cmd, "", err) common.ExitOnErr(cmd, "", err)
key := key.Get(cmd) key := key.Get(cmd)
@ -165,10 +165,10 @@ func initContainerCreateCmd() {
"Skip placement validity check") "Skip placement validity check")
} }
func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error) { func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.PlacementPolicy, error) {
_, err := os.Stat(policyString) // check if `policyString` is a path to file with placement policy _, err := os.Stat(policyString) // check if `policyString` is a path to file with placement policy
if err == nil { if err == nil {
common.PrintVerbose("Reading placement policy from file: %s", policyString) common.PrintVerbose(cmd, "Reading placement policy from file: %s", policyString)
data, err := os.ReadFile(policyString) data, err := os.ReadFile(policyString)
if err != nil { if err != nil {
@ -182,12 +182,12 @@ func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error)
err = result.DecodeString(policyString) err = result.DecodeString(policyString)
if err == nil { if err == nil {
common.PrintVerbose("Parsed QL encoded policy") common.PrintVerbose(cmd, "Parsed QL encoded policy")
return &result, nil return &result, nil
} }
if err = result.UnmarshalJSON([]byte(policyString)); err == nil { if err = result.UnmarshalJSON([]byte(policyString)); err == nil {
common.PrintVerbose("Parsed JSON encoded policy") common.PrintVerbose(cmd, "Parsed JSON encoded policy")
return &result, nil return &result, nil
} }

View file

@ -27,7 +27,7 @@ Only owner of the container has a permission to remove container.`,
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC) cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
if force, _ := cmd.Flags().GetBool(commonflags.ForceFlag); !force { if force, _ := cmd.Flags().GetBool(commonflags.ForceFlag); !force {
common.PrintVerbose("Reading the container to check ownership...") common.PrintVerbose(cmd, "Reading the container to check ownership...")
var getPrm internalclient.GetContainerPrm var getPrm internalclient.GetContainerPrm
getPrm.SetClient(cli) getPrm.SetClient(cli)
@ -39,13 +39,13 @@ Only owner of the container has a permission to remove container.`,
owner := resGet.Container().Owner() owner := resGet.Container().Owner()
if tok != nil { if tok != nil {
common.PrintVerbose("Checking session issuer...") common.PrintVerbose(cmd, "Checking session issuer...")
if !tok.Issuer().Equals(owner) { if !tok.Issuer().Equals(owner) {
common.ExitOnErr(cmd, "", fmt.Errorf("session issuer differs with the container owner: expected %s, has %s", owner, tok.Issuer())) common.ExitOnErr(cmd, "", fmt.Errorf("session issuer differs with the container owner: expected %s, has %s", owner, tok.Issuer()))
} }
} else { } else {
common.PrintVerbose("Checking provided account...") common.PrintVerbose(cmd, "Checking provided account...")
var acc user.ID var acc user.ID
user.IDFromKey(&acc, pk.PublicKey) user.IDFromKey(&acc, pk.PublicKey)
@ -55,10 +55,10 @@ Only owner of the container has a permission to remove container.`,
} }
} }
common.PrintVerbose("Account matches the container owner.") common.PrintVerbose(cmd, "Account matches the container owner.")
if tok != nil { if tok != nil {
common.PrintVerbose("Skip searching for LOCK objects - session provided.") common.PrintVerbose(cmd, "Skip searching for LOCK objects - session provided.")
} else { } else {
fs := objectSDK.NewSearchFilters() fs := objectSDK.NewSearchFilters()
fs.AddTypeFilter(objectSDK.MatchStringEqual, objectSDK.TypeLock) fs.AddTypeFilter(objectSDK.MatchStringEqual, objectSDK.TypeLock)
@ -69,7 +69,7 @@ Only owner of the container has a permission to remove container.`,
searchPrm.SetFilters(fs) searchPrm.SetFilters(fs)
searchPrm.SetTTL(2) searchPrm.SetTTL(2)
common.PrintVerbose("Searching for LOCK objects...") common.PrintVerbose(cmd, "Searching for LOCK objects...")
res, err := internalclient.SearchObjects(searchPrm) res, err := internalclient.SearchObjects(searchPrm)
common.ExitOnErr(cmd, "can't search for LOCK objects: %w", err) common.ExitOnErr(cmd, "can't search for LOCK objects: %w", err)

View file

@ -1,9 +1,7 @@
package container package container
import ( import (
"bytes"
"crypto/ecdsa" "crypto/ecdsa"
"encoding/json"
"os" "os"
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client" internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
@ -77,18 +75,7 @@ func (x *stringWriter) WriteString(s string) (n int, err error) {
func prettyPrintContainer(cmd *cobra.Command, cnr container.Container, jsonEncoding bool) { func prettyPrintContainer(cmd *cobra.Command, cnr container.Container, jsonEncoding bool) {
if jsonEncoding { if jsonEncoding {
data, err := cnr.MarshalJSON() common.PrettyPrintJSON(cmd, cnr, "container")
if err != nil {
common.PrintVerbose("Can't convert container to json: %w", err)
return
}
buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil {
common.PrintVerbose("Can't pretty print json: %w", err)
}
cmd.Println(buf)
return return
} }

View file

@ -36,22 +36,22 @@ func parseContainerID(cmd *cobra.Command) cid.ID {
// decodes session.Container from the file by path provided in // decodes session.Container from the file by path provided in
// commonflags.SessionToken flag. Returns nil if the path is not specified. // commonflags.SessionToken flag. Returns nil if the path is not specified.
func getSession(cmd *cobra.Command) *session.Container { func getSession(cmd *cobra.Command) *session.Container {
common.PrintVerbose("Reading container session...") common.PrintVerbose(cmd, "Reading container session...")
path, _ := cmd.Flags().GetString(commonflags.SessionToken) path, _ := cmd.Flags().GetString(commonflags.SessionToken)
if path == "" { if path == "" {
common.PrintVerbose("Session not provided.") common.PrintVerbose(cmd, "Session not provided.")
return nil return nil
} }
common.PrintVerbose("Reading container session from the file [%s]...", path) common.PrintVerbose(cmd, "Reading container session from the file [%s]...", path)
var res session.Container var res session.Container
err := common.ReadBinaryOrJSON(&res, path) err := common.ReadBinaryOrJSON(cmd, &res, path)
common.ExitOnErr(cmd, "read container session: %v", err) common.ExitOnErr(cmd, "read container session: %v", err)
common.PrintVerbose("Session successfully read.") common.PrintVerbose(cmd, "Session successfully read.")
return &res return &res
} }

View file

@ -51,7 +51,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
printIgnoreForce := func(st control.NetmapStatus) { printIgnoreForce := func(st control.NetmapStatus) {
if force { if force {
common.PrintVerbose("Ignore --%s flag for %s state.", commonflags.ForceFlag, st) common.PrintVerbose(cmd, "Ignore --%s flag for %s state.", commonflags.ForceFlag, st)
} }
} }
@ -69,7 +69,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
if force { if force {
body.SetForceMaintenance() body.SetForceMaintenance()
common.PrintVerbose("Local maintenance will be forced.") common.PrintVerbose(cmd, "Local maintenance will be forced.")
} }
} }

View file

@ -60,13 +60,13 @@ var objectLockCmd = &cobra.Command{
endpoint, _ := cmd.Flags().GetString(commonflags.RPC) endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint) currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
common.ExitOnErr(cmd, "Request current epoch: %w", err) common.ExitOnErr(cmd, "Request current epoch: %w", err)
exp = currEpoch + lifetime exp = currEpoch + lifetime
} }
common.PrintVerbose("Lock object will expire after %d epoch", exp) common.PrintVerbose(cmd, "Lock object will expire after %d epoch", exp)
var expirationAttr objectSDK.Attribute var expirationAttr objectSDK.Attribute
expirationAttr.SetKey(objectV2.SysAttributeExpEpoch) expirationAttr.SetKey(objectV2.SysAttributeExpEpoch)

View file

@ -46,7 +46,7 @@ func InitBearer(cmd *cobra.Command) {
// Prepare prepares object-related parameters for a command. // Prepare prepares object-related parameters for a command.
func Prepare(cmd *cobra.Command, prms ...RPCParameters) { func Prepare(cmd *cobra.Command, prms ...RPCParameters) {
ttl := viper.GetUint32(commonflags.TTL) ttl := viper.GetUint32(commonflags.TTL)
common.PrintVerbose("TTL: %d", ttl) common.PrintVerbose(cmd, "TTL: %d", ttl)
for i := range prms { for i := range prms {
btok := common.ReadBearerToken(cmd, bearerTokenFlag) btok := common.ReadBearerToken(cmd, bearerTokenFlag)
@ -127,19 +127,19 @@ func readSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr
// decodes session.Object from the file by path specified in the // decodes session.Object from the file by path specified in the
// commonflags.SessionToken flag. Returns nil if flag is not set. // commonflags.SessionToken flag. Returns nil if flag is not set.
func getSession(cmd *cobra.Command) *session.Object { func getSession(cmd *cobra.Command) *session.Object {
common.PrintVerbose("Trying to read session from the file...") common.PrintVerbose(cmd, "Trying to read session from the file...")
path, _ := cmd.Flags().GetString(commonflags.SessionToken) path, _ := cmd.Flags().GetString(commonflags.SessionToken)
if path == "" { if path == "" {
common.PrintVerbose("File with session token is not provided.") common.PrintVerbose(cmd, "File with session token is not provided.")
return nil return nil
} }
common.PrintVerbose("Reading session from the file [%s]...", path) common.PrintVerbose(cmd, "Reading session from the file [%s]...", path)
var tok session.Object var tok session.Object
err := common.ReadBinaryOrJSON(&tok, path) err := common.ReadBinaryOrJSON(cmd, &tok, path)
common.ExitOnErr(cmd, "read session: %v", err) common.ExitOnErr(cmd, "read session: %v", err)
return &tok return &tok
@ -185,7 +185,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
return return
} }
common.PrintVerbose("Checking session correctness...") common.PrintVerbose(cmd, "Checking session correctness...")
switch false { switch false {
case tok.AssertContainer(cnr): case tok.AssertContainer(cnr):
@ -200,7 +200,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
common.ExitOnErr(cmd, "", errors.New("invalid signature of the session data")) common.ExitOnErr(cmd, "", errors.New("invalid signature of the session data"))
} }
common.PrintVerbose("Session is correct.") common.PrintVerbose(cmd, "Session is correct.")
dst.SetSessionToken(tok) dst.SetSessionToken(tok)
} }
@ -227,7 +227,7 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.
objs = []oid.ID{*obj} objs = []oid.ID{*obj}
if _, ok := dst.(*internal.DeleteObjectPrm); ok { if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...") common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
objs = append(objs, collectObjectRelatives(cmd, cli, cnr, *obj)...) objs = append(objs, collectObjectRelatives(cmd, cli, cnr, *obj)...)
} }
@ -259,7 +259,7 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
if obj != nil { if obj != nil {
if _, ok := dst.(*internal.DeleteObjectPrm); ok { if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...") common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
rels := collectObjectRelatives(cmd, cli, cnr, *obj) rels := collectObjectRelatives(cmd, cli, cnr, *obj)
@ -275,12 +275,12 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
const sessionLifetime = 10 // in NeoFS epochs const sessionLifetime = 10 // in NeoFS epochs
common.PrintVerbose("Opening remote session with the node...") common.PrintVerbose(cmd, "Opening remote session with the node...")
err := sessionCli.CreateSession(&tok, cli, sessionLifetime) err := sessionCli.CreateSession(&tok, cli, sessionLifetime)
common.ExitOnErr(cmd, "open remote session: %w", err) common.ExitOnErr(cmd, "open remote session: %w", err)
common.PrintVerbose("Session successfully opened.") common.PrintVerbose(cmd, "Session successfully opened.")
finalizeSession(cmd, dst, &tok, key, cnr, objs...) finalizeSession(cmd, dst, &tok, key, cnr, objs...)
@ -297,33 +297,33 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
// *internal.PutObjectPrm // *internal.PutObjectPrm
// *internal.DeleteObjectPrm // *internal.DeleteObjectPrm
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) { func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) {
common.PrintVerbose("Finalizing session token...") common.PrintVerbose(cmd, "Finalizing session token...")
switch dst.(type) { switch dst.(type) {
default: default:
panic(fmt.Sprintf("unsupported op parameters %T", dst)) panic(fmt.Sprintf("unsupported op parameters %T", dst))
case *internal.PutObjectPrm: case *internal.PutObjectPrm:
common.PrintVerbose("Binding session to object PUT...") common.PrintVerbose(cmd, "Binding session to object PUT...")
tok.ForVerb(session.VerbObjectPut) tok.ForVerb(session.VerbObjectPut)
case *internal.DeleteObjectPrm: case *internal.DeleteObjectPrm:
common.PrintVerbose("Binding session to object DELETE...") common.PrintVerbose(cmd, "Binding session to object DELETE...")
tok.ForVerb(session.VerbObjectDelete) tok.ForVerb(session.VerbObjectDelete)
} }
common.PrintVerbose("Binding session to container %s...", cnr) common.PrintVerbose(cmd, "Binding session to container %s...", cnr)
tok.BindContainer(cnr) tok.BindContainer(cnr)
if len(objs) > 0 { if len(objs) > 0 {
common.PrintVerbose("Limiting session by the objects %v...", objs) common.PrintVerbose(cmd, "Limiting session by the objects %v...", objs)
tok.LimitByObjects(objs...) tok.LimitByObjects(objs...)
} }
common.PrintVerbose("Signing session...") common.PrintVerbose(cmd, "Signing session...")
err := tok.Sign(*key) err := tok.Sign(*key)
common.ExitOnErr(cmd, "sign session: %w", err) common.ExitOnErr(cmd, "sign session: %w", err)
common.PrintVerbose("Session token successfully formed and attached to the request.") common.PrintVerbose(cmd, "Session token successfully formed and attached to the request.")
dst.SetSessionToken(tok) dst.SetSessionToken(tok)
} }
@ -339,7 +339,7 @@ func initFlagSession(cmd *cobra.Command, verb string) {
// //
// The object itself is not included in the result. // The object itself is not included in the result.
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID { func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
common.PrintVerbose("Fetching raw object header...") common.PrintVerbose(cmd, "Fetching raw object header...")
// request raw header first // request raw header first
var addrObj oid.Address var addrObj oid.Address
@ -361,10 +361,10 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
default: default:
common.ExitOnErr(cmd, "failed to get raw object header: %w", err) common.ExitOnErr(cmd, "failed to get raw object header: %w", err)
case err == nil: case err == nil:
common.PrintVerbose("Raw header received - object is singular.") common.PrintVerbose(cmd, "Raw header received - object is singular.")
return nil return nil
case errors.As(err, &errSplit): case errors.As(err, &errSplit):
common.PrintVerbose("Split information received - object is virtual.") common.PrintVerbose(cmd, "Split information received - object is virtual.")
} }
splitInfo := errSplit.SplitInfo() splitInfo := errSplit.SplitInfo()
@ -373,7 +373,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// If any approach fails, we don't try the next since we assume that it will fail too. // If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok { if idLinking, ok := splitInfo.Link(); ok {
common.PrintVerbose("Collecting split members using linking object %s...", idLinking) common.PrintVerbose(cmd, "Collecting split members using linking object %s...", idLinking)
addrObj.SetObject(idLinking) addrObj.SetObject(idLinking)
prmHead.SetAddress(addrObj) prmHead.SetAddress(addrObj)
@ -381,18 +381,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// client is already set // client is already set
res, err := internal.HeadObject(prmHead) res, err := internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to get linking object's header: %w", err) if err == nil {
children := res.Header().Children()
children := res.Header().Children() common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
common.PrintVerbose("Received split members from the linking object: %v", children) // include linking object
return append(children, idLinking)
}
// include linking object // linking object is not required for
return append(children, idLinking) // object collecting
common.PrintVerbose(cmd, "failed to get linking object's header: %w", err)
} }
if idSplit := splitInfo.SplitID(); idSplit != nil { if idSplit := splitInfo.SplitID(); idSplit != nil {
common.PrintVerbose("Collecting split members by split ID...") common.PrintVerbose(cmd, "Collecting split members by split ID...")
var query object.SearchFilters var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, idSplit) query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
@ -407,7 +411,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
members := res.IDList() members := res.IDList()
common.PrintVerbose("Found objects by split ID: %v", res.IDList()) common.PrintVerbose(cmd, "Found objects by split ID: %v", res.IDList())
return members return members
} }
@ -417,7 +421,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information")) common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information"))
} }
common.PrintVerbose("Traverse the object split chain in reverse...", idMember) common.PrintVerbose(cmd, "Traverse the object split chain in reverse...", idMember)
var res *internal.HeadObjectRes var res *internal.HeadObjectRes
chain := []oid.ID{idMember} chain := []oid.ID{idMember}
@ -427,7 +431,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// split members are almost definitely singular, but don't get hung up on it // split members are almost definitely singular, but don't get hung up on it
for { for {
common.PrintVerbose("Reading previous element of the split chain member %s...", idMember) common.PrintVerbose(cmd, "Reading previous element of the split chain member %s...", idMember)
addrObj.SetObject(idMember) addrObj.SetObject(idMember)
@ -436,7 +440,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
idMember, ok = res.Header().PreviousID() idMember, ok = res.Header().PreviousID()
if !ok { if !ok {
common.PrintVerbose("Chain ended.") common.PrintVerbose(cmd, "Chain ended.")
break break
} }
@ -448,7 +452,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
chainSet[idMember] = struct{}{} chainSet[idMember] = struct{}{}
} }
common.PrintVerbose("Looking for a linking object...") common.PrintVerbose(cmd, "Looking for a linking object...")
var query object.SearchFilters var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, obj) query.AddParentIDFilter(object.MatchStringEqual, obj)
@ -465,7 +469,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
for i := range list { for i := range list {
if _, ok = chainSet[list[i]]; !ok { if _, ok = chainSet[list[i]]; !ok {
common.PrintVerbose("Found one more related object %s.", list[i]) common.PrintVerbose(cmd, "Found one more related object %s.", list[i])
chain = append(chain, list[i]) chain = append(chain, list[i])
} }
} }

View file

@ -118,6 +118,6 @@ func initConfig() {
// If a config file is found, read it in. // If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil { if err := viper.ReadInConfig(); err == nil {
common.PrintVerbose("Using config file: %s", viper.ConfigFileUsed()) common.PrintVerbose(rootCmd, "Using config file: %s", viper.ConfigFileUsed())
} }
} }

View file

@ -54,7 +54,7 @@ func createSession(cmd *cobra.Command, _ []string) {
addrStr, _ := cmd.Flags().GetString(commonflags.RPC) addrStr, _ := cmd.Flags().GetString(commonflags.RPC)
common.ExitOnErr(cmd, "can't parse endpoint: %w", netAddr.FromString(addrStr)) common.ExitOnErr(cmd, "can't parse endpoint: %w", netAddr.FromString(addrStr))
c, err := internalclient.GetSDKClient(privKey, netAddr) c, err := internalclient.GetSDKClient(cmd, privKey, netAddr)
common.ExitOnErr(cmd, "can't create client: %w", err) common.ExitOnErr(cmd, "can't create client: %w", err)
lifetime := uint64(defaultLifetime) lifetime := uint64(defaultLifetime)

View file

@ -83,7 +83,7 @@ func addByPath(cmd *cobra.Command, _ []string) {
nn := resp.GetBody().GetNodes() nn := resp.GetBody().GetNodes()
if len(nn) == 0 { if len(nn) == 0 {
common.PrintVerbose("No new nodes were created") common.PrintVerbose(cmd, "No new nodes were created")
return return
} }

View file

@ -80,7 +80,7 @@ func getByPath(cmd *cobra.Command, _ []string) {
nn := resp.GetBody().GetNodes() nn := resp.GetBody().GetNodes()
if len(nn) == 0 { if len(nn) == 0 {
common.PrintVerbose("The node is not found") common.PrintVerbose(cmd, "The node is not found")
return return
} }

View file

@ -1,8 +1,6 @@
package util package util
import ( import (
"bytes"
"encoding/json"
"os" "os"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common" "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
@ -45,7 +43,7 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
} }
if len(to) == 0 { if len(to) == 0 {
prettyPrintJSON(cmd, data) common.PrettyPrintJSON(cmd, table, "eACL")
return return
} }
@ -54,12 +52,3 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
cmd.Printf("extended ACL table was successfully dumped to %s\n", to) cmd.Printf("extended ACL table was successfully dumped to %s\n", to)
} }
func prettyPrintJSON(cmd *cobra.Command, data []byte) {
buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil {
common.PrintVerbose("Can't pretty print json: %w", err)
}
cmd.Println(buf)
}

View file

@ -51,8 +51,7 @@ func signBearerToken(cmd *cobra.Command, _ []string) {
} }
if len(to) == 0 { if len(to) == 0 {
prettyPrintJSON(cmd, data) common.PrettyPrintJSON(cmd, btok, "bearer token")
return return
} }

View file

@ -52,7 +52,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
new(session.Object), new(session.Object),
new(session.Container), new(session.Container),
} { } {
errLast = common.ReadBinaryOrJSON(el, fPath) errLast = common.ReadBinaryOrJSON(cmd, el, fPath)
if errLast == nil { if errLast == nil {
stok = el stok = el
break break
@ -71,7 +71,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
to := cmd.Flag(signToFlag).Value.String() to := cmd.Flag(signToFlag).Value.String()
if len(to) == 0 { if len(to) == 0 {
prettyPrintJSON(cmd, data) common.PrettyPrintJSON(cmd, stok, "session token")
return return
} }

View file

@ -215,7 +215,8 @@ func (s *lruNetmapSource) Epoch() (uint64, error) {
// wrapper over TTL cache of values read from the network // wrapper over TTL cache of values read from the network
// that implements container lister. // that implements container lister.
type ttlContainerLister struct { type ttlContainerLister struct {
*ttlNetCache[string, *cacheItemContainerList] inner *ttlNetCache[string, *cacheItemContainerList]
client *cntClient.Client
} }
// value type for ttlNetCache used by ttlContainerLister. // value type for ttlNetCache used by ttlContainerLister.
@ -251,20 +252,18 @@ func newCachedContainerLister(c *cntClient.Client, ttl time.Duration) ttlContain
}, nil }, nil
}) })
return ttlContainerLister{lruCnrListerCache} return ttlContainerLister{inner: lruCnrListerCache, client: c}
} }
// List returns list of container IDs from the cache. If list is missing in the // List returns list of container IDs from the cache. If list is missing in the
// cache or expired, then it returns container IDs from side chain and updates // cache or expired, then it returns container IDs from side chain and updates
// the cache. // the cache.
func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) { func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
var str string if id == nil {
return s.client.List(nil)
if id != nil {
str = id.EncodeToString()
} }
item, err := s.get(str) item, err := s.inner.get(id.EncodeToString())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -287,13 +286,17 @@ func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) { func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
strOwner := owner.EncodeToString() strOwner := owner.EncodeToString()
val, ok := s.cache.Get(strOwner) val, ok := s.inner.cache.Peek(strOwner)
if !ok { if !ok {
// we could cache the single cnr but in this case we will disperse // we could cache the single cnr but in this case we will disperse
// with the Sidechain a lot // with the Sidechain a lot
return return
} }
if s.inner.ttl <= time.Since(val.t) {
return
}
item := val.v item := val.v
item.mtx.Lock() item.mtx.Lock()

View file

@ -127,7 +127,6 @@ func initContainerService(c *cfg) {
cnrRdr.get = c.cfgObject.cnrSource cnrRdr.get = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true cnrWrt.cacheEnabled = true
cnrWrt.lists = cachedContainerLister
cnrWrt.eacls = cachedEACLStorage cnrWrt.eacls = cachedEACLStorage
} }
@ -658,7 +657,6 @@ type morphContainerWriter struct {
cacheEnabled bool cacheEnabled bool
eacls ttlEACLStorage eacls ttlEACLStorage
lists ttlContainerLister
} }
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) { func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {

View file

@ -41,8 +41,8 @@ type boltDBCfg struct {
boltOptions *bbolt.Options boltOptions *bbolt.Options
} }
func defaultCfg(с *cfg) { func defaultCfg(c *cfg) {
*с = cfg{ *c = cfg{
boltDBCfg: boltDBCfg{ boltDBCfg: boltDBCfg{
perm: os.ModePerm, // 0777 perm: os.ModePerm, // 0777
boltOptions: &bbolt.Options{ boltOptions: &bbolt.Options{

View file

@ -6,6 +6,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -45,11 +46,14 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
} }
if _, err := active.blz.Put(putPrm); err != nil { if _, err := active.blz.Put(putPrm); err != nil {
// check if blobovnicza is full // Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
if errors.Is(err, blobovnicza.ErrFull) { // or update active blobovnicza in other thread. In the latter case the database will be closed
b.log.Debug("blobovnicza overflowed", // and `updateActive` takes care of not updating the active blobovnicza twice.
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))), if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) {
) if isFull {
b.log.Debug("blobovnicza overflowed",
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))))
}
if err := b.updateActive(p, &active.ind); err != nil { if err := b.updateActive(p, &active.ind); err != nil {
if !isLogical(err) { if !isLogical(err) {

View file

@ -239,15 +239,17 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
prm.RawData = t.Compress(prm.RawData) prm.RawData = t.Compress(prm.RawData)
} }
err := t.writeFile(p, prm.RawData) tmpPath := p + "#"
err := t.writeFile(tmpPath, prm.RawData)
if err != nil { if err != nil {
var pe *fs.PathError var pe *fs.PathError
if errors.As(err, &pe) && pe.Err == syscall.ENOSPC { if errors.As(err, &pe) && pe.Err == syscall.ENOSPC {
err = common.ErrNoSpace err = common.ErrNoSpace
_ = os.RemoveAll(tmpPath)
} }
} }
return common.PutRes{StorageID: []byte{}}, err return common.PutRes{StorageID: []byte{}}, os.Rename(tmpPath, p)
} }
func (t *FSTree) writeFlags() int { func (t *FSTree) writeFlags() int {

View file

@ -82,6 +82,8 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
return EvacuateShardRes{}, errMustHaveTwoShards return EvacuateShardRes{}, errMustHaveTwoShards
} }
e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList))
// We must have all shards, to have correct information about their // We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase. // indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put. // Evacuated shard is skipped during put.
@ -185,5 +187,7 @@ mainLoop:
} }
} }
e.log.Info("finished shards evacuation",
zap.Strings("shard_ids", sidList))
return res, nil return res, nil
} }

View file

@ -65,8 +65,12 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock() e.mtx.RLock()
pool := e.shardPools[sh.ID().String()] pool, ok := e.shardPools[sh.ID().String()]
e.mtx.RUnlock() e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
return false
}
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj) putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
finished = putDone || exists finished = putDone || exists

View file

@ -12,7 +12,7 @@ import (
var _ pilorama.Forest = (*StorageEngine)(nil) var _ pilorama.Forest = (*StorageEngine)(nil)
// TreeMove implements the pilorama.Forest interface. // TreeMove implements the pilorama.Forest interface.
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
index, lst, err := e.getTreeShard(d.CID, treeID) index, lst, err := e.getTreeShard(d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err return nil, err
@ -32,7 +32,7 @@ func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pil
} }
// TreeAddByPath implements the pilorama.Forest interface. // TreeAddByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) { func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.Move, error) {
index, lst, err := e.getTreeShard(d.CID, treeID) index, lst, err := e.getTreeShard(d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err return nil, err

View file

@ -0,0 +1,50 @@
package pilorama
import (
"sort"
"sync"
"time"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"go.etcd.io/bbolt"
)
type batch struct {
forest *boltForest
timer *time.Timer
mtx sync.Mutex
start sync.Once
cid cidSDK.ID
treeID string
results []chan<- error
operations []*Move
}
func (b *batch) trigger() {
b.mtx.Lock()
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.mtx.Unlock()
b.start.Do(b.run)
}
func (b *batch) run() {
sort.Slice(b.operations, func(i, j int) bool {
return b.operations[i].Time < b.operations[j].Time
})
fullID := bucketName(b.cid, b.treeID)
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
var lm Move
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
})
for i := range b.operations {
b.results[i] <- err
}
}

View file

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"github.com/TrueCloudLab/frostfs-node/pkg/util" "github.com/TrueCloudLab/frostfs-node/pkg/util"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/io"
@ -21,8 +22,13 @@ import (
type boltForest struct { type boltForest struct {
db *bbolt.DB db *bbolt.DB
modeMtx sync.Mutex modeMtx sync.RWMutex
mode mode.Mode mode mode.Mode
// mtx protects batches field.
mtx sync.Mutex
batches []*batch
cfg cfg
} }
@ -31,6 +37,12 @@ var (
logBucket = []byte{1} logBucket = []byte{1}
) )
// ErrDegradedMode is returned when pilorama is in a degraded mode.
var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode")
// ErrReadOnlyMode is returned when pilorama is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode")
// NewBoltForest returns storage wrapper for storing operations on CRDT trees. // NewBoltForest returns storage wrapper for storing operations on CRDT trees.
// //
// Each tree is stored in a separate bucket by `CID + treeID` key. // Each tree is stored in a separate bucket by `CID + treeID` key.
@ -71,12 +83,9 @@ func (t *boltForest) SetMode(m mode.Mode) error {
if t.mode == m { if t.mode == m {
return nil return nil
} }
if t.mode.ReadOnly() == m.ReadOnly() {
return nil
}
err := t.Close() err := t.Close()
if err == nil { if err == nil && !m.NoMetabase() {
if err = t.Open(m.ReadOnly()); err == nil { if err = t.Open(m.ReadOnly()); err == nil {
err = t.Init() err = t.Init()
} }
@ -110,7 +119,7 @@ func (t *boltForest) Open(readOnly bool) error {
return nil return nil
} }
func (t *boltForest) Init() error { func (t *boltForest) Init() error {
if t.db.IsReadOnly() { if t.mode.NoMetabase() || t.db.IsReadOnly() {
return nil return nil
} }
return t.db.Update(func(tx *bbolt.Tx) error { return t.db.Update(func(tx *bbolt.Tx) error {
@ -133,29 +142,45 @@ func (t *boltForest) Close() error {
} }
// TreeMove implements the Forest interface. // TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) { func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error) {
if !d.checkValid() { if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
var lm LogMove t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
lm := *m
fullID := bucketName(d.CID, treeID)
return &lm, t.db.Batch(func(tx *bbolt.Tx) error { return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil { if err != nil {
return err return err
} }
m.Time = t.getLatestTimestamp(bLog, d.Position, d.Size) lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
if m.Child == RootID { if lm.Child == RootID {
m.Child = t.findSpareID(bTree) lm.Child = t.findSpareID(bTree)
} }
lm.Move = *m return t.do(bLog, bTree, make([]byte, 17), &lm)
return t.applyOperation(bLog, bTree, &lm)
}) })
} }
// TreeExists implements the Forest interface. // TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return false, ErrDegradedMode
}
var exists bool var exists bool
err := t.db.View(func(tx *bbolt.Tx) error { err := t.db.View(func(tx *bbolt.Tx) error {
@ -168,7 +193,7 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
} }
// TreeAddByPath implements the Forest interface. // TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) { func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() { if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
@ -176,11 +201,21 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
return nil, ErrNotPathAttribute return nil, ErrNotPathAttribute
} }
var lm []LogMove t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
var lm []Move
var key [17]byte var key [17]byte
fullID := bucketName(d.CID, treeID)
err := t.db.Batch(func(tx *bbolt.Tx) error { err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil { if err != nil {
return err return err
} }
@ -191,9 +226,9 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
} }
ts := t.getLatestTimestamp(bLog, d.Position, d.Size) ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]LogMove, len(path)-i+1) lm = make([]Move, len(path)-i+1)
for j := i; j < len(path); j++ { for j := i; j < len(path); j++ {
lm[j-i].Move = Move{ lm[j-i] = Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: ts, Time: ts,
@ -211,7 +246,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
node = lm[j-i].Child node = lm[j-i].Child
} }
lm[len(lm)-1].Move = Move{ lm[len(lm)-1] = Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: ts, Time: ts,
@ -240,17 +275,14 @@ func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint6
// findSpareID returns random unused ID. // findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
id := uint64(rand.Int63()) id := uint64(rand.Int63())
key := make([]byte, 9)
var key [9]byte
key[0] = 't'
binary.LittleEndian.PutUint64(key[1:], id)
for { for {
if bTree.Get(key[:]) == nil { _, _, _, ok := t.getState(bTree, stateKey(key, id))
if !ok {
return id return id
} }
id = uint64(rand.Int63()) id = uint64(rand.Int63())
binary.LittleEndian.PutUint64(key[1:], id)
} }
} }
@ -260,6 +292,15 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
return ErrInvalidCIDDescriptor return ErrInvalidCIDDescriptor
} }
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
if backgroundSync { if backgroundSync {
var seen bool var seen bool
err := t.db.View(func(tx *bbolt.Tx) error { err := t.db.View(func(tx *bbolt.Tx) error {
@ -280,19 +321,68 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
} }
} }
return t.db.Batch(func(tx *bbolt.Tx) error { if t.db.MaxBatchSize == 1 {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) fullID := bucketName(d.CID, treeID)
if err != nil { return t.db.Update(func(tx *bbolt.Tx) error {
return err bLog, bTree, err := t.getTreeBuckets(tx, fullID)
} if err != nil {
return err
}
lm := &LogMove{Move: *m} var lm Move
return t.applyOperation(bLog, bTree, lm) return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
}) })
}
ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch)
return <-ch
} }
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) { func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
treeRoot := bucketName(cid, treeID) t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock()
if t.batches[i].timer == nil {
t.batches[i].mtx.Unlock()
copy(t.batches[i:], t.batches[i+1:])
t.batches = t.batches[:len(t.batches)-1]
i--
continue
}
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
if found {
t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m)
if len(t.batches[i].operations) == t.db.MaxBatchSize {
t.batches[i].timer.Stop()
t.batches[i].timer = nil
t.batches[i].mtx.Unlock()
b := t.batches[i]
t.mtx.Unlock()
b.trigger()
return
}
t.batches[i].mtx.Unlock()
t.mtx.Unlock()
return
}
t.batches[i].mtx.Unlock()
}
b := &batch{
forest: t,
cid: d.CID,
treeID: treeID,
results: []chan<- error{ch},
operations: []*Move{m},
}
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
t.batches = append(t.batches, b)
t.mtx.Unlock()
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) {
child := tx.Bucket(treeRoot) child := tx.Bucket(treeRoot)
if child != nil { if child != nil {
return child.Bucket(logBucket), child.Bucket(dataBucket), nil return child.Bucket(logBucket), child.Bucket(dataBucket), nil
@ -313,16 +403,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
return bLog, bData, nil return bLog, bData, nil
} }
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error { // applyOperations applies log operations. Assumes lm are sorted by timestamp.
var tmp LogMove func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error {
var tmp Move
var cKey [17]byte var cKey [17]byte
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], lm.Time)
if logBucket.Get(logKey[:]) != nil {
return nil
}
c := logBucket.Cursor() c := logBucket.Cursor()
key, value := c.Last() key, value := c.Last()
@ -331,82 +416,87 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
r := io.NewBinReaderFromIO(b) r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here. // 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time { for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value) b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err tmp.Child = r.ReadU64LE()
tmp.Parent = r.ReadU64LE()
tmp.Time = r.ReadVarUint()
if r.Err != nil {
return r.Err
} }
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil { if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
return err return err
} }
key, value = c.Prev() key, value = c.Prev()
} }
key, _ = c.Next() for i := 0; i < len(ms); i++ {
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation. // 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time { *lm = *ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err return err
} }
}
if key == nil { // Cursor can be invalid, seek again.
// The operation is inserted in the beginning, reposition the cursor. binary.BigEndian.PutUint64(cKey[:], lm.Time)
// Otherwise, `Next` call will return currently inserted operation. _, _ = c.Seek(cKey[:8])
c.First()
}
key, value = c.Seek(key)
// 3. Re-apply all other operations.
for len(key) == 8 {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
}
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
return err
}
key, value = c.Next() key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
if err := t.logFromBytes(&tmp, value); err != nil {
return err
}
if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
return err
}
key, value = c.Next()
}
} }
return nil return nil
} }
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error { func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error {
currParent := b.Get(parentKey(key, op.Child))
op.HasOld = currParent != nil
if currParent != nil { // node is already in tree
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
return err
}
} else {
op.HasOld = false
op.Old = nodeInfo{}
}
binary.BigEndian.PutUint64(key, op.Time) binary.BigEndian.PutUint64(key, op.Time)
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil { rawLog := t.logToBytes(op)
if err := lb.Put(key[:8], rawLog); err != nil {
return err return err
} }
if op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) { return t.redo(b, key, op, rawLog[16:])
return nil }
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error {
var err error
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
if inTree {
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
} else {
ts = op.Time
err = b.Delete(oldKey(key, op.Time))
} }
if currParent == nil { if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil { return err
return err }
}
} else { if inTree {
parent := binary.LittleEndian.Uint64(currParent)
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil { if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
return err return err
} }
for i := range op.Old.Meta.Items {
if isAttributeInternal(op.Old.Meta.Items[i].Key) { var meta Meta
key = internalKey(key, op.Old.Meta.Items[i].Key, string(op.Old.Meta.Items[i].Value), parent, op.Child) if err := meta.FromBytes(currMeta); err != nil {
return err
}
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
err := b.Delete(key) err := b.Delete(key)
if err != nil { if err != nil {
return err return err
@ -414,17 +504,16 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo
} }
} }
} }
return t.addNode(b, key, op.Child, op.Parent, op.Meta) return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
} }
// removeNode removes node keys from the tree except the children key or its parent. // removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error { func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
if err := b.Delete(parentKey(key, node)); err != nil { k := stateKey(key, node)
return err _, _, rawMeta, _ := t.getState(b, k)
}
var meta Meta var meta Meta
var k = metaKey(key, node) if err := meta.FromBytes(rawMeta); err == nil {
if err := meta.FromBytes(b.Get(k)); err == nil {
for i := range meta.Items { for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) { if isAttributeInternal(meta.Items[i].Key) {
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node)) err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
@ -434,23 +523,16 @@ func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node)
} }
} }
} }
if err := b.Delete(metaKey(key, node)); err != nil { return b.Delete(k)
return err
}
return b.Delete(timestampKey(key, node))
} }
// addNode adds node keys to the tree except the timestamp key. // addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error { func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
err := b.Put(parentKey(key, child), toUint64(parent)) if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
if err != nil {
return err return err
} }
err = b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil { err := b.Put(childrenKey(key, child, parent), []byte{1})
return err
}
err = b.Put(metaKey(key, child), meta.Bytes())
if err != nil { if err != nil {
return err return err
} }
@ -473,27 +555,33 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me
return nil return nil
} }
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error { func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error {
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil { if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
return err return err
} }
if !lm.HasOld { parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
if !ok {
return t.removeNode(b, key, m.Child, m.Parent) return t.removeNode(b, key, m.Child, m.Parent)
} }
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
var meta Meta
if err := meta.FromBytes(rawMeta); err != nil {
return err
}
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
} }
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool { func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
key := make([]byte, 9) key := make([]byte, 9)
key[0] = 'p' key[0] = 's'
for node := child; node != parent; { for node := child; node != parent; {
binary.LittleEndian.PutUint64(key[1:], node) binary.LittleEndian.PutUint64(key[1:], node)
rawParent := b.Get(key) parent, _, _, ok := t.getState(b, key)
if len(rawParent) != 8 { if !ok {
return false return false
} }
node = binary.LittleEndian.Uint64(rawParent) node = parent
} }
return true return true
} }
@ -508,6 +596,13 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
return nil, nil return nil, nil
} }
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var nodes []Node var nodes []Node
return nodes, t.db.View(func(tx *bbolt.Tx) error { return nodes, t.db.View(func(tx *bbolt.Tx) error {
@ -526,10 +621,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
return nil return nil
} }
var ( var maxTimestamp uint64
childID [9]byte
maxTimestamp uint64
)
c := b.Cursor() c := b.Cursor()
@ -539,7 +631,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) { for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:]) child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest { if latest {
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child))) _, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp { if ts >= maxTimestamp {
nodes = append(nodes[:0], child) nodes = append(nodes[:0], child)
maxTimestamp = ts maxTimestamp = ts
@ -555,7 +647,14 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
// TreeGetMeta implements the forest interface. // TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) { func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
key := parentKey(make([]byte, 9), nodeID) t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Meta{}, 0, ErrDegradedMode
}
key := stateKey(make([]byte, 9), nodeID)
var m Meta var m Meta
var parentID uint64 var parentID uint64
@ -567,10 +666,11 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
} }
b := treeRoot.Bucket(dataBucket) b := treeRoot.Bucket(dataBucket)
if data := b.Get(key); len(data) == 8 { if data := b.Get(key); len(data) != 0 {
parentID = binary.LittleEndian.Uint64(data) parentID = binary.LittleEndian.Uint64(data)
} }
return m.FromBytes(b.Get(metaKey(key, nodeID))) _, _, meta, _ := t.getState(b, stateKey(key, nodeID))
return m.FromBytes(meta)
}) })
return m, parentID, err return m, parentID, err
@ -578,6 +678,13 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
// TreeGetChildren implements the Forest interface. // TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) { func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
key := make([]byte, 9) key := make([]byte, 9)
key[0] = 'c' key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID) binary.LittleEndian.PutUint64(key[1:], nodeID)
@ -603,8 +710,17 @@ func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node)
// TreeList implements the Forest interface. // TreeList implements the Forest interface.
func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) { func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var ids []string var ids []string
cidRaw := []byte(cid.EncodeToString()) cidRaw := make([]byte, 32)
cid.Encode(cidRaw)
cidLen := len(cidRaw) cidLen := len(cidRaw)
err := t.db.View(func(tx *bbolt.Tx) error { err := t.db.View(func(tx *bbolt.Tx) error {
@ -628,6 +744,13 @@ func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
// TreeGetOpLog implements the pilorama.Forest interface. // TreeGetOpLog implements the pilorama.Forest interface.
func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error) { func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Move{}, ErrDegradedMode
}
key := make([]byte, 8) key := make([]byte, 8)
binary.BigEndian.PutUint64(key, height) binary.BigEndian.PutUint64(key, height)
@ -651,10 +774,20 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (
// TreeDrop implements the pilorama.Forest interface. // TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error { func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
return t.db.Batch(func(tx *bbolt.Tx) error { return t.db.Batch(func(tx *bbolt.Tx) error {
if treeID == "" { if treeID == "" {
c := tx.Cursor() c := tx.Cursor()
prefix := []byte(cid.EncodeToString()) prefix := make([]byte, 32)
cid.Encode(prefix)
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() { for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
err := tx.DeleteBucket(k) err := tx.DeleteBucket(k)
if err != nil { if err != nil {
@ -698,67 +831,72 @@ loop:
} }
func (t *boltForest) moveFromBytes(m *Move, data []byte) error { func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
r := io.NewBinReaderFromBuf(data) return t.logFromBytes(m, data)
m.Child = r.ReadU64LE()
m.Parent = r.ReadU64LE()
m.Meta.DecodeBinary(r)
return r.Err
} }
func (t *boltForest) logFromBytes(lm *LogMove, r *io.BinReader) error { func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
lm.Child = r.ReadU64LE() lm.Child = binary.LittleEndian.Uint64(data)
lm.Parent = r.ReadU64LE() lm.Parent = binary.LittleEndian.Uint64(data[8:])
lm.Meta.DecodeBinary(r) return lm.Meta.FromBytes(data[16:])
lm.HasOld = r.ReadBool()
if lm.HasOld {
lm.Old.Parent = r.ReadU64LE()
lm.Old.Meta.DecodeBinary(r)
}
return r.Err
} }
func (t *boltForest) logToBytes(lm *LogMove) []byte { func (t *boltForest) logToBytes(lm *Move) []byte {
w := io.NewBufBinWriter() w := io.NewBufBinWriter()
size := 8 + 8 + lm.Meta.Size() + 1 size := 8 + 8 + lm.Meta.Size() + 1
if lm.HasOld { //if lm.HasOld {
size += 8 + lm.Old.Meta.Size() // size += 8 + lm.Old.Meta.Size()
} //}
w.Grow(size) w.Grow(size)
w.WriteU64LE(lm.Child) w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent) w.WriteU64LE(lm.Parent)
lm.Meta.EncodeBinary(w.BinWriter) lm.Meta.EncodeBinary(w.BinWriter)
w.WriteBool(lm.HasOld) //w.WriteBool(lm.HasOld)
if lm.HasOld { //if lm.HasOld {
w.WriteU64LE(lm.Old.Parent) // w.WriteU64LE(lm.Old.Parent)
lm.Old.Meta.EncodeBinary(w.BinWriter) // lm.Old.Meta.EncodeBinary(w.BinWriter)
} //}
return w.Bytes() return w.Bytes()
} }
func bucketName(cid cidSDK.ID, treeID string) []byte { func bucketName(cid cidSDK.ID, treeID string) []byte {
return []byte(cid.String() + treeID) treeRoot := make([]byte, 32+len(treeID))
cid.Encode(treeRoot)
copy(treeRoot[32:], treeID)
return treeRoot
} }
// 't' + node (id) -> timestamp when the node first appeared. // 'o' + time -> old meta.
func timestampKey(key []byte, child Node) []byte { func oldKey(key []byte, ts Timestamp) []byte {
key[0] = 't' key[0] = 'o'
binary.LittleEndian.PutUint64(key[1:], ts)
return key[:9]
}
// 's' + child ID -> parent + timestamp of the first appearance + meta.
func stateKey(key []byte, child Node) []byte {
key[0] = 's'
binary.LittleEndian.PutUint64(key[1:], child) binary.LittleEndian.PutUint64(key[1:], child)
return key[:9] return key[:9]
} }
// 'p' + node (id) -> parent (id). func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
func parentKey(key []byte, child Node) []byte { data := make([]byte, len(meta)+8+8)
key[0] = 'p' binary.LittleEndian.PutUint64(data, parent)
binary.LittleEndian.PutUint64(key[1:], child) binary.LittleEndian.PutUint64(data[8:], timestamp)
return key[:9] copy(data[16:], meta)
return b.Put(key, data)
} }
// 'm' + node (id) -> serialized meta. func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
func metaKey(key []byte, child Node) []byte { data := b.Get(key)
key[0] = 'm' if data == nil {
binary.LittleEndian.PutUint64(key[1:], child) return 0, 0, nil, false
return key[:9] }
parent := binary.LittleEndian.Uint64(data)
timestamp := binary.LittleEndian.Uint64(data[8:])
return parent, timestamp, data[16:], true
} }
// 'c' + parent (id) + child (id) -> 0/1. // 'c' + parent (id) + child (id) -> 0/1.

View file

@ -25,7 +25,7 @@ func NewMemoryForest() ForestStorage {
} }
// TreeMove implements the Forest interface. // TreeMove implements the Forest interface.
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogMove, error) { func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*Move, error) {
if !d.checkValid() { if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
@ -44,11 +44,11 @@ func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogM
lm := s.do(op) lm := s.do(op)
s.operations = append(s.operations, lm) s.operations = append(s.operations, lm)
return &lm, nil return &lm.Move, nil
} }
// TreeAddByPath implements the Forest interface. // TreeAddByPath implements the Forest interface.
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) { func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]Move, error) {
if !d.checkValid() { if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor return nil, ErrInvalidCIDDescriptor
} }
@ -64,22 +64,23 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
} }
i, node := s.getPathPrefix(attr, path) i, node := s.getPathPrefix(attr, path)
lm := make([]LogMove, len(path)-i+1) lm := make([]Move, len(path)-i+1)
for j := i; j < len(path); j++ { for j := i; j < len(path); j++ {
lm[j-i] = s.do(&Move{ op := s.do(&Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: s.timestamp(d.Position, d.Size), Time: s.timestamp(d.Position, d.Size),
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}}, Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
Child: s.findSpareID(), Child: s.findSpareID(),
}) })
node = lm[j-i].Child lm[j-i] = op.Move
s.operations = append(s.operations, lm[j-i]) node = op.Child
s.operations = append(s.operations, op)
} }
mCopy := make([]KeyValue, len(m)) mCopy := make([]KeyValue, len(m))
copy(mCopy, m) copy(mCopy, m)
lm[len(lm)-1] = s.do(&Move{ op := s.do(&Move{
Parent: node, Parent: node,
Meta: Meta{ Meta: Meta{
Time: s.timestamp(d.Position, d.Size), Time: s.timestamp(d.Position, d.Size),
@ -87,6 +88,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}, },
Child: s.findSpareID(), Child: s.findSpareID(),
}) })
lm[len(lm)-1] = op.Move
return lm, nil return lm, nil
} }

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"testing" "testing"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id" cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
@ -16,9 +17,9 @@ import (
var providers = []struct { var providers = []struct {
name string name string
construct func(t testing.TB) Forest construct func(t testing.TB, opts ...Option) Forest
}{ }{
{"inmemory", func(t testing.TB) Forest { {"inmemory", func(t testing.TB, _ ...Option) Forest {
f := NewMemoryForest() f := NewMemoryForest()
require.NoError(t, f.Open(false)) require.NoError(t, f.Open(false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
@ -28,14 +29,15 @@ var providers = []struct {
return f return f
}}, }},
{"bbolt", func(t testing.TB) Forest { {"bbolt", func(t testing.TB, opts ...Option) Forest {
// Use `os.TempDir` because we construct multiple times in the same test. // Use `os.TempDir` because we construct multiple times in the same test.
tmpDir, err := os.MkdirTemp(os.TempDir(), "*") tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
require.NoError(t, err) require.NoError(t, err)
f := NewBoltForest( f := NewBoltForest(
WithPath(filepath.Join(tmpDir, "test.db")), append([]Option{
WithMaxBatchSize(1)) WithPath(filepath.Join(tmpDir, "test.db")),
WithMaxBatchSize(1)}, opts...)...)
require.NoError(t, f.Open(false)) require.NoError(t, f.Open(false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
t.Cleanup(func() { t.Cleanup(func() {
@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) {
} }
} }
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1} d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) {
} }
} }
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) { func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1} d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) {
} }
} }
func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) { func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) {
s := constructor(t) s := constructor(t)
checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) { checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) {
@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) {
} }
} }
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) { func TestForest_ParallelApply(t *testing.T) {
rand.Seed(42) for i := range providers {
if providers[i].name == "inmemory" {
const ( continue
nodeCount = 5 }
opCount = 20 t.Run(providers[i].name, func(t *testing.T) {
iterCount = 200 testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10)
) })
}
cid := cidtest.ID() }
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
// prepareRandomTree creates a random sequence of operation and applies them to tree.
// The operations are guaranteed to be applied and returned sorted by `Time`.
func prepareRandomTree(nodeCount, opCount int) []Move {
ops := make([]Move, nodeCount+opCount) ops := make([]Move, nodeCount+opCount)
for i := 0; i < nodeCount; i++ { for i := 0; i < nodeCount; i++ {
ops[i] = Move{ ops[i] = Move{
@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
for i := nodeCount; i < len(ops); i++ { for i := nodeCount; i < len(ops); i++ {
ops[i] = Move{ ops[i] = Move{
Parent: rand.Uint64() % (nodeCount + 12), Parent: rand.Uint64() % uint64(nodeCount+12),
Meta: Meta{ Meta: Meta{
Time: Timestamp(i + nodeCount), Time: Timestamp(i + nodeCount),
Items: []KeyValue{ Items: []KeyValue{
@ -694,17 +696,111 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
{Value: make([]byte, 10)}, {Value: make([]byte, 10)},
}, },
}, },
Child: rand.Uint64() % (nodeCount + 10), Child: rand.Uint64() % uint64(nodeCount+10),
} }
if rand.Uint32()%5 == 0 { if rand.Uint32()%5 == 0 {
ops[i].Parent = TrashID ops[i].Parent = TrashID
} }
rand.Read(ops[i].Meta.Items[1].Value) rand.Read(ops[i].Meta.Items[1].Value)
} }
return ops
}
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ {
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
require.Equal(t, expectedParent, actualParent, "node id: %d", i)
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
if ma, ok := actual.(*memoryForest); ok {
me := expected.(*memoryForest)
require.Equal(t, len(me.treeMap), len(ma.treeMap))
for k, sa := range ma.treeMap {
se, ok := me.treeMap[k]
require.True(t, ok)
require.Equal(t, se.operations, sa.operations)
require.Equal(t, se.infoMap, sa.infoMap)
require.Equal(t, len(se.childMap), len(sa.childMap))
for ck, la := range sa.childMap {
le, ok := se.childMap[ck]
require.True(t, ok)
require.ElementsMatch(t, le, la)
}
}
require.Equal(t, expected, actual, i)
}
}
}
func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) {
rand.Seed(42)
const nodeCount = 5
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops { for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
} }
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t, WithMaxBatchSize(batchSize))
wg := new(sync.WaitGroup)
ch := make(chan *Move, 0)
for i := 0; i < batchSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for op := range ch {
require.NoError(t, actual.TreeApply(d, treeID, op, false))
}
}()
}
for i := range ops {
ch <- &ops[i]
}
close(ch)
wg.Wait()
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
rand.Seed(42)
const (
nodeCount = 5
opCount = 20
)
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
const iterCount = 200
for i := 0; i < iterCount; i++ { for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place. // Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] }) rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
@ -713,59 +809,39 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
for i := range ops { for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false)) require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
} }
for i := uint64(0); i < nodeCount; i++ { compareForests(t, expected, actual, cid, treeID, nodeCount)
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
require.Equal(t, expectedParent, actualParent, "node id: %d", i)
require.Equal(t, expectedMeta, actualMeta, "node id: %d", i)
if ma, ok := actual.(*memoryForest); ok {
me := expected.(*memoryForest)
require.Equal(t, len(me.treeMap), len(ma.treeMap))
for k, sa := range ma.treeMap {
se, ok := me.treeMap[k]
require.True(t, ok)
require.Equal(t, se.operations, sa.operations)
require.Equal(t, se.infoMap, sa.infoMap)
require.Equal(t, len(se.childMap), len(sa.childMap))
for ck, la := range sa.childMap {
le, ok := se.childMap[ck]
require.True(t, ok)
require.ElementsMatch(t, le, la)
}
}
require.Equal(t, expected, actual, i)
}
}
} }
} }
const benchNodeCount = 1000 const benchNodeCount = 1000
var batchSizes = []int{1, 2, 4, 8, 16, 32}
func BenchmarkApplySequential(b *testing.B) { func BenchmarkApplySequential(b *testing.B) {
for i := range providers { for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue continue
} }
b.Run(providers[i].name, func(b *testing.B) { b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { for _, bs := range batchSizes {
ops := make([]Move, opCount) b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
for i := range ops { s := providers[i].construct(b, WithMaxBatchSize(bs))
ops[i] = Move{ benchmarkApply(b, s, func(opCount int) []Move {
Parent: uint64(rand.Intn(benchNodeCount)), ops := make([]Move, opCount)
Meta: Meta{ for i := range ops {
Time: Timestamp(i), ops[i] = Move{
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, Parent: uint64(rand.Intn(benchNodeCount)),
}, Meta: Meta{
Child: uint64(rand.Intn(benchNodeCount)), Time: Timestamp(i),
} Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
} },
return ops Child: uint64(rand.Intn(benchNodeCount)),
}) }
}
return ops
})
})
}
}) })
} }
} }
@ -780,25 +856,30 @@ func BenchmarkApplyReorderLast(b *testing.B) {
continue continue
} }
b.Run(providers[i].name, func(b *testing.B) { b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { for _, bs := range batchSizes {
ops := make([]Move, opCount) b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
for i := range ops { s := providers[i].construct(b, WithMaxBatchSize(bs))
ops[i] = Move{ benchmarkApply(b, s, func(opCount int) []Move {
Parent: uint64(rand.Intn(benchNodeCount)), ops := make([]Move, opCount)
Meta: Meta{ for i := range ops {
Time: Timestamp(i), ops[i] = Move{
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, Parent: uint64(rand.Intn(benchNodeCount)),
}, Meta: Meta{
Child: uint64(rand.Intn(benchNodeCount)), Time: Timestamp(i),
} Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
if i != 0 && i%blockSize == 0 { },
for j := 0; j < blockSize/2; j++ { Child: uint64(rand.Intn(benchNodeCount)),
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] }
if i != 0 && i%blockSize == 0 {
for j := 0; j < blockSize/2; j++ {
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
}
}
} }
} return ops
} })
return ops })
}) }
}) })
} }
} }
@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
cid := cidtest.ID() cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1} d := CIDDescriptor{cid, 0, 1}
treeID := "version" treeID := "version"
ch := make(chan *Move, b.N) ch := make(chan int, b.N)
for i := range ops { for i := 0; i < b.N; i++ {
ch <- &ops[i] ch <- i
} }
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
b.SetParallelism(50) b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
op := <-ch if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
if err := s.TreeApply(d, treeID, op, false); err != nil {
b.Fatalf("error in `Apply`: %v", err) b.Fatalf("error in `Apply`: %v", err)
} }
} }

View file

@ -6,9 +6,15 @@ type nodeInfo struct {
Meta Meta Meta Meta
} }
type move struct {
Move
HasOld bool
Old nodeInfo
}
// state represents state being replicated. // state represents state being replicated.
type state struct { type state struct {
operations []LogMove operations []move
tree tree
} }
@ -20,7 +26,7 @@ func newState() *state {
} }
// undo un-does op and changes s in-place. // undo un-does op and changes s in-place.
func (s *state) undo(op *LogMove) { func (s *state) undo(op *move) {
children := s.tree.childMap[op.Parent] children := s.tree.childMap[op.Parent]
for i := range children { for i := range children {
if children[i] == op.Child { if children[i] == op.Child {
@ -76,8 +82,8 @@ func (s *state) Apply(op *Move) error {
} }
// do performs a single move operation on a tree. // do performs a single move operation on a tree.
func (s *state) do(op *Move) LogMove { func (s *state) do(op *Move) move {
lm := LogMove{ lm := move{
Move: Move{ Move: Move{
Parent: op.Parent, Parent: op.Parent,
Meta: op.Meta, Meta: op.Meta,

View file

@ -11,11 +11,11 @@ type Forest interface {
// TreeMove moves node in the tree. // TreeMove moves node in the tree.
// If the parent of the move operation is TrashID, the node is removed. // If the parent of the move operation is TrashID, the node is removed.
// If the child of the move operation is RootID, new ID is generated and added to a tree. // If the child of the move operation is RootID, new ID is generated and added to a tree.
TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error)
// TreeAddByPath adds new node in the tree using provided path. // TreeAddByPath adds new node in the tree using provided path.
// The path is constructed by descending from the root using the values of the attr in meta. // The path is constructed by descending from the root using the values of the attr in meta.
// Internal nodes in path should have exactly one attribute, otherwise a new node is created. // Internal nodes in path should have exactly one attribute, otherwise a new node is created.
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
// TreeApply applies replicated operation from another node. // TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists. // If background is true, TreeApply will first check whether an operation exists.
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error

View file

@ -35,13 +35,6 @@ type Move struct {
Child Node Child Node
} }
// LogMove represents log record for a single move operation.
type LogMove struct {
Move
HasOld bool
Old nodeInfo
}
const ( const (
// RootID represents the ID of a root node. // RootID represents the ID of a root node.
RootID = 0 RootID = 0

View file

@ -261,7 +261,10 @@ func (s *Shard) Close() error {
} }
} }
s.gc.stop() // If Init/Open was unsuccessful gc can be nil.
if s.gc != nil {
s.gc.stop()
}
return nil return nil
} }

View file

@ -91,11 +91,15 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
}, err }, err
} }
// emptyStorageID is an empty storageID that indicates that
// an object is big (and is stored in an FSTree, not in a blobovnicza).
var emptyStorageID = make([]byte, 0)
// fetchObjectData looks through writeCache and blobStor to find object. // fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) { func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
var ( var (
err error mErr error
res *objectSDK.Object mRes meta.ExistsRes
) )
var exists bool var exists bool
@ -103,15 +107,15 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
var mPrm meta.ExistsPrm var mPrm meta.ExistsPrm
mPrm.SetAddress(addr) mPrm.SetAddress(addr)
mRes, err := s.metaBase.Exists(mPrm) mRes, mErr = s.metaBase.Exists(mPrm)
if err != nil && !s.info.Mode.NoMetabase() { if mErr != nil && !s.info.Mode.NoMetabase() {
return res, false, err return nil, false, mErr
} }
exists = mRes.Exists() exists = mRes.Exists()
} }
if s.hasWriteCache() { if s.hasWriteCache() {
res, err = wc(s.writeCache) res, err := wc(s.writeCache)
if err == nil || IsErrOutOfRange(err) { if err == nil || IsErrOutOfRange(err) {
return res, false, err return res, false, err
} }
@ -123,8 +127,8 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
} }
} }
if skipMeta || err != nil { if skipMeta || mErr != nil {
res, err = cb(s.blobStor, nil) res, err := cb(s.blobStor, nil)
return res, false, err return res, false, err
} }
@ -135,12 +139,20 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
var mPrm meta.StorageIDPrm var mPrm meta.StorageIDPrm
mPrm.SetAddress(addr) mPrm.SetAddress(addr)
mRes, err := s.metaBase.StorageID(mPrm) mExRes, err := s.metaBase.StorageID(mPrm)
if err != nil { if err != nil {
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
} }
res, err = cb(s.blobStor, mRes.StorageID()) storageID := mExRes.StorageID()
if storageID == nil {
// `nil` storageID returned without any error
// means that object is big, `cb` expects an
// empty (but non-nil) storageID in such cases
storageID = emptyStorageID
}
res, err := cb(s.blobStor, storageID)
return res, true, err return res, true, err
} }

View file

@ -3,6 +3,7 @@ package shard
import ( import (
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"go.uber.org/zap"
) )
// ErrReadOnlyMode is returned when it is impossible to apply operation // ErrReadOnlyMode is returned when it is impossible to apply operation
@ -24,6 +25,10 @@ func (s *Shard) SetMode(m mode.Mode) error {
} }
func (s *Shard) setMode(m mode.Mode) error { func (s *Shard) setMode(m mode.Mode) error {
s.log.Info("setting shard mode",
zap.Stringer("old_mode", s.info.Mode),
zap.Stringer("new_mode", m))
components := []interface{ SetMode(mode.Mode) error }{ components := []interface{ SetMode(mode.Mode) error }{
s.metaBase, s.blobStor, s.metaBase, s.blobStor,
} }
@ -61,6 +66,8 @@ func (s *Shard) setMode(m mode.Mode) error {
s.metricsWriter.SetReadonly(s.info.Mode != mode.ReadWrite) s.metricsWriter.SetReadonly(s.info.Mode != mode.ReadWrite)
} }
s.log.Info("shard mode set successfully",
zap.Stringer("mode", s.info.Mode))
return nil return nil
} }

View file

@ -12,7 +12,7 @@ var _ pilorama.Forest = (*Shard)(nil)
var ErrPiloramaDisabled = logicerr.New("pilorama is disabled") var ErrPiloramaDisabled = logicerr.New("pilorama is disabled")
// TreeMove implements the pilorama.Forest interface. // TreeMove implements the pilorama.Forest interface.
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) { func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
if s.pilorama == nil { if s.pilorama == nil {
return nil, ErrPiloramaDisabled return nil, ErrPiloramaDisabled
} }
@ -27,7 +27,7 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
} }
// TreeAddByPath implements the pilorama.Forest interface. // TreeAddByPath implements the pilorama.Forest interface.
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) { func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.Move, error) {
if s.pilorama == nil { if s.pilorama == nil {
return nil, ErrPiloramaDisabled return nil, ErrPiloramaDisabled
} }

View file

@ -11,6 +11,7 @@ import (
clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
"github.com/TrueCloudLab/frostfs-node/pkg/network" "github.com/TrueCloudLab/frostfs-node/pkg/network"
"github.com/TrueCloudLab/frostfs-sdk-go/client" "github.com/TrueCloudLab/frostfs-sdk-go/client"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
) )
type singleClient struct { type singleClient struct {
@ -29,8 +30,6 @@ type multiClient struct {
addr network.AddressGroup addr network.AddressGroup
opts ClientCacheOpts opts ClientCacheOpts
reconnectInterval time.Duration
} }
const defaultReconnectInterval = time.Second * 30 const defaultReconnectInterval = time.Second * 30
@ -40,10 +39,9 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien
opts.ReconnectTimeout = defaultReconnectInterval opts.ReconnectTimeout = defaultReconnectInterval
} }
return &multiClient{ return &multiClient{
clients: make(map[string]*singleClient), clients: make(map[string]*singleClient),
addr: addr, addr: addr,
opts: opts, opts: opts,
reconnectInterval: defaultReconnectInterval,
} }
} }
@ -149,8 +147,12 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
err = f(c) err = f(c)
} }
success := err == nil || errors.Is(err, context.Canceled) // non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
var siErr *object.SplitInfoError
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) { if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
firstErr = err firstErr = err
} }
@ -170,6 +172,14 @@ func (x *multiClient) ReportError(err error) {
return return
} }
// non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
var siErr *object.SplitInfoError
if errors.As(err, &siErr) {
return
}
// Dropping all clients here is not necessary, we do this // Dropping all clients here is not necessary, we do this
// because `multiClient` doesn't yet provide convenient interface // because `multiClient` doesn't yet provide convenient interface
// for reporting individual errors for streaming operations. // for reporting individual errors for streaming operations.
@ -327,7 +337,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
c.RUnlock() c.RUnlock()
return cl, nil return cl, nil
} }
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval { if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
c.RUnlock() c.RUnlock()
return nil, errRecentlyFailed return nil, errRecentlyFailed
} }
@ -350,7 +360,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
return c.client, nil return c.client, nil
} }
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval { if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
return nil, errRecentlyFailed return nil, errRecentlyFailed
} }

View file

@ -10,6 +10,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/core/netmap" "github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object" "github.com/TrueCloudLab/frostfs-node/pkg/services/object"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger" "github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/TrueCloudLab/frostfs-sdk-go/container/acl" "github.com/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id" cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
@ -573,6 +574,9 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
if err != nil { if err != nil {
return info, errors.New("can't fetch current epoch") return info, errors.New("can't fetch current epoch")
} }
if req.token.ExpiredAt(currentEpoch) {
return info, apistatus.SessionTokenExpired{}
}
if req.token.InvalidAt(currentEpoch) { if req.token.InvalidAt(currentEpoch) {
return info, fmt.Errorf("%s: token is invalid at %d epoch)", return info, fmt.Errorf("%s: token is invalid at %d epoch)",
invalidRequestMessage, currentEpoch) invalidRequestMessage, currentEpoch)

View file

@ -6,9 +6,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// LogServiceError writes debug error message of object service to provided logger. // LogServiceError writes error message of object service to provided logger.
func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, err error) { func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, err error) {
l.Debug("object service error", l.Error("object service error",
zap.String("node", network.StringifyGroup(node)), zap.String("node", network.StringifyGroup(node)),
zap.String("request", req), zap.String("request", req),
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -17,7 +17,7 @@ func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, er
// LogWorkerPoolError writes debug error message of object worker pool to provided logger. // LogWorkerPoolError writes debug error message of object worker pool to provided logger.
func LogWorkerPoolError(l *logger.Logger, req string, err error) { func LogWorkerPoolError(l *logger.Logger, req string, err error) {
l.Debug("could not push task to worker pool", l.Error("could not push task to worker pool",
zap.String("request", req), zap.String("request", req),
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )

View file

@ -17,7 +17,7 @@ import (
type movePair struct { type movePair struct {
cid cidSDK.ID cid cidSDK.ID
treeID string treeID string
op *pilorama.LogMove op *pilorama.Move
} }
type replicationTask struct { type replicationTask struct {
@ -25,12 +25,33 @@ type replicationTask struct {
req *ApplyRequest req *ApplyRequest
} }
type applyOp struct {
treeID string
pilorama.CIDDescriptor
pilorama.Move
}
const ( const (
defaultReplicatorCapacity = 64 defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64 defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5 defaultReplicatorSendTimeout = time.Second * 5
) )
func (s *Service) localReplicationWorker() {
for {
select {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
if err != nil {
s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error()))
}
}
}
}
func (s *Service) replicationWorker() { func (s *Service) replicationWorker() {
for { for {
select { select {
@ -74,6 +95,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) { func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ { for i := 0; i < s.replicatorWorkerCount; i++ {
go s.replicationWorker() go s.replicationWorker()
go s.localReplicationWorker()
} }
defer func() { defer func() {
for len(s.replicationTasks) != 0 { for len(s.replicationTasks) != 0 {
@ -119,14 +141,14 @@ func (s *Service) replicate(op movePair) error {
return nil return nil
} }
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove) { func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
select { select {
case s.replicateCh <- movePair{ case s.replicateCh <- movePair{
cid: cid, cid: cid,
treeID: treeID, treeID: treeID,
op: op, op: op,
}: }:
case <-s.closeCh: default:
} }
} }

View file

@ -23,6 +23,7 @@ type Service struct {
cache clientCache cache clientCache
replicateCh chan movePair replicateCh chan movePair
replicateLocalCh chan applyOp
replicationTasks chan replicationTask replicationTasks chan replicationTask
closeCh chan struct{} closeCh chan struct{}
containerCache containerCache containerCache containerCache
@ -59,6 +60,7 @@ func New(opts ...Option) *Service {
s.cache.init() s.cache.init()
s.closeCh = make(chan struct{}) s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize) s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64) s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
@ -483,13 +485,19 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
return nil, fmt.Errorf("can't parse meta-information: %w", err) return nil, fmt.Errorf("can't parse meta-information: %w", err)
} }
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} select {
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}} case s.replicateLocalCh <- applyOp{
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ treeID: req.GetBody().GetTreeId(),
Parent: op.GetParentId(), CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
Child: op.GetChildId(), Move: pilorama.Move{
Meta: meta, Parent: op.GetParentId(),
}, false) Child: op.GetChildId(),
Meta: meta,
},
}:
default:
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {

View file

@ -360,7 +360,7 @@ func (s *Service) syncLoop(ctx context.Context) {
} }
// randomizeNodeOrder shuffles nodes and removes not a `pos` index. // randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes) // It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo { func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
if len(cnrNodes) == 1 { if len(cnrNodes) == 1 {
return nil return nil