Move diff from nspcc master and support branches #28

Merged
fyrchik merged 28 commits from move-changes into master 2023-01-25 12:31:47 +00:00
50 changed files with 845 additions and 469 deletions

View file

@ -4,8 +4,19 @@ Changelog for FrostFS Node
## [Unreleased]
### Added
- Separate batching for replicated operations over the same container in pilorama (#1621)
- Doc for extended headers (#2128)
### Changed
- `common.PrintVerbose` prints via `cobra.Command.Printf` (#1962)
### Fixed
- Big object removal with non-local parts (#1978)
- Disable pilorama when moving to degraded mode (#2197)
- Fetching blobovnicza objects that not found in write-cache (#2206)
- Do not search for the small objects in FSTree (#2206)
- Correct status error for expired session token (#2207)
### Removed
### Updated
- `neo-go` to `v0.100.1`
@ -48,6 +59,7 @@ Changelog for FrostFS Node
- Shard uses metabase for `HEAD` requests by default, not write-cache (#2167)
- Clarify help for `--expire-at` parameter for commands `object lock/put` and `bearer create` (#2097)
- Node spawns `GETRANGE` requests signed with the node's key if session key was not found for `RANGEHASH` (#2144)
- Full list of container is no longer cached (#2176)
### Fixed
- Open FSTree in sync mode by default (#1992)

View file

@ -0,0 +1,34 @@
# Extended headers
## Overview
Extended headers are used for request/response. They may contain any user-defined headers
to be interpreted on application level.
Key name must be a unique valid UTF-8 string. Value can't be empty. Requests or
Responses with duplicated header names or headers with empty values are
considered invalid.
## Existing headers
There are some "well-known" headers starting with `__NEOFS__` prefix that
affect system behaviour:
* `__NEOFS__NETMAP_EPOCH` - netmap epoch to use for object placement calculation. The `value` is string
encoded `uint64` in decimal presentation. If set to '0' or omitted, the
current epoch only will be used.
* `__NEOFS__NETMAP_LOOKUP_DEPTH` - if object can't be found using current epoch's netmap, this header limits
how many past epochs the node can look up through. Depth is applied to a current epoch or the value
of `__NEOFS__NETMAP_EPOCH` attribute. The `value` is string encoded `uint64` in decimal presentation.
If set to '0' or not set, only the current epoch is used.
## `neofs-cli` commands with `--xhdr`
List of commands with support of extended headers:
* `container list-objects`
* `object delete/get/hash/head/lock/put/range/search`
* `storagegroup delete/get/list/put`
Example:
```shell
$ neofs-cli object put -r s01.neofs.devenv:8080 -w wallet.json --cid CID --file FILE --xhdr "__NEOFS__NETMAP_EPOCH=777"
```

View file

@ -21,25 +21,25 @@ var errInvalidEndpoint = errors.New("provided RPC endpoint is incorrect")
// GetSDKClientByFlag returns default frostfs-sdk-go client using the specified flag for the address.
// On error, outputs to stderr of cmd and exits with non-zero code.
func GetSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) *client.Client {
cli, err := getSDKClientByFlag(key, endpointFlag)
cli, err := getSDKClientByFlag(cmd, key, endpointFlag)
if err != nil {
common.ExitOnErr(cmd, "can't create API client: %w", err)
}
return cli
}
func getSDKClientByFlag(key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
func getSDKClientByFlag(cmd *cobra.Command, key *ecdsa.PrivateKey, endpointFlag string) (*client.Client, error) {
var addr network.Address
err := addr.FromString(viper.GetString(endpointFlag))
if err != nil {
return nil, fmt.Errorf("%v: %w", errInvalidEndpoint, err)
}
return GetSDKClient(key, addr)
return GetSDKClient(cmd, key, addr)
}
// GetSDKClient returns default frostfs-sdk-go client.
func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
func GetSDKClient(cmd *cobra.Command, key *ecdsa.PrivateKey, addr network.Address) (*client.Client, error) {
var (
c client.Client
prmInit client.PrmInit
@ -56,7 +56,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
prmDial.SetTimeout(timeout)
prmDial.SetStreamTimeout(timeout)
common.PrintVerbose("Set request timeout to %s.", timeout)
common.PrintVerbose(cmd, "Set request timeout to %s.", timeout)
}
c.Init(prmInit)
@ -69,7 +69,7 @@ func GetSDKClient(key *ecdsa.PrivateKey, addr network.Address) (*client.Client,
}
// GetCurrentEpoch returns current epoch.
func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) {
func GetCurrentEpoch(ctx context.Context, cmd *cobra.Command, endpoint string) (uint64, error) {
var addr network.Address
if err := addr.FromString(endpoint); err != nil {
@ -81,7 +81,7 @@ func GetCurrentEpoch(ctx context.Context, endpoint string) (uint64, error) {
return 0, fmt.Errorf("can't generate key to sign query: %w", err)
}
c, err := GetSDKClient(key, addr)
c, err := GetSDKClient(cmd, key, addr)
if err != nil {
return 0, err
}

View file

@ -19,7 +19,7 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
ExitOnErr(cmd, "", errors.New("incorrect path to file with EACL"))
}
PrintVerbose("Reading EACL from file: %s", eaclPath)
PrintVerbose(cmd, "Reading EACL from file: %s", eaclPath)
data, err := os.ReadFile(eaclPath)
ExitOnErr(cmd, "can't read file with EACL: %w", err)
@ -28,13 +28,13 @@ func ReadEACL(cmd *cobra.Command, eaclPath string) *eacl.Table {
if err = table.UnmarshalJSON(data); err == nil {
validateAndFixEACLVersion(table)
PrintVerbose("Parsed JSON encoded EACL table")
PrintVerbose(cmd, "Parsed JSON encoded EACL table")
return table
}
if err = table.Unmarshal(data); err == nil {
validateAndFixEACLVersion(table)
PrintVerbose("Parsed binary encoded EACL table")
PrintVerbose(cmd, "Parsed binary encoded EACL table")
return table
}

View file

@ -11,12 +11,12 @@ import (
func PrettyPrintJSON(cmd *cobra.Command, m json.Marshaler, entity string) {
data, err := m.MarshalJSON()
if err != nil {
PrintVerbose("Can't convert %s to json: %w", entity, err)
PrintVerbose(cmd, "Can't convert %s to json: %w", entity, err)
return
}
buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil {
PrintVerbose("Can't pretty print json: %w", err)
PrintVerbose(cmd, "Can't pretty print json: %w", err)
return
}
cmd.Println(buf)

View file

@ -19,11 +19,11 @@ func ReadBearerToken(cmd *cobra.Command, flagname string) *bearer.Token {
return nil
}
PrintVerbose("Reading bearer token from file [%s]...", path)
PrintVerbose(cmd, "Reading bearer token from file [%s]...", path)
var tok bearer.Token
err = ReadBinaryOrJSON(&tok, path)
err = ReadBinaryOrJSON(cmd, &tok, path)
ExitOnErr(cmd, "invalid bearer token: %v", err)
return &tok
@ -38,8 +38,8 @@ type BinaryOrJSON interface {
// ReadBinaryOrJSON reads file data using provided path and decodes
// BinaryOrJSON from the data.
func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error {
PrintVerbose("Reading file [%s]...", fPath)
func ReadBinaryOrJSON(cmd *cobra.Command, dst BinaryOrJSON, fPath string) error {
PrintVerbose(cmd, "Reading file [%s]...", fPath)
// try to read session token from file
data, err := os.ReadFile(fPath)
@ -47,17 +47,17 @@ func ReadBinaryOrJSON(dst BinaryOrJSON, fPath string) error {
return fmt.Errorf("read file <%s>: %w", fPath, err)
}
PrintVerbose("Trying to decode binary...")
PrintVerbose(cmd, "Trying to decode binary...")
err = dst.Unmarshal(data)
if err != nil {
PrintVerbose("Failed to decode binary: %v", err)
PrintVerbose(cmd, "Failed to decode binary: %v", err)
PrintVerbose("Trying to decode JSON...")
PrintVerbose(cmd, "Trying to decode JSON...")
err = dst.UnmarshalJSON(data)
if err != nil {
PrintVerbose("Failed to decode JSON: %v", err)
PrintVerbose(cmd, "Failed to decode JSON: %v", err)
return errors.New("invalid format")
}
}

View file

@ -2,7 +2,6 @@ package common
import (
"encoding/hex"
"fmt"
"strconv"
"time"
@ -13,9 +12,9 @@ import (
)
// PrintVerbose prints to the stdout if the commonflags.Verbose flag is on.
func PrintVerbose(format string, a ...interface{}) {
func PrintVerbose(cmd *cobra.Command, format string, a ...interface{}) {
if viper.GetBool(commonflags.Verbose) {
fmt.Printf(format+"\n", a...)
cmd.Printf(format+"\n", a...)
}
}

View file

@ -11,11 +11,18 @@ import (
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
"golang.org/x/term"
)
var testCmd = &cobra.Command{
Use: "test",
Short: "test",
Run: func(cmd *cobra.Command, args []string) {},
}
func Test_getOrGenerate(t *testing.T) {
dir := t.TempDir()
@ -96,7 +103,7 @@ func Test_getOrGenerate(t *testing.T) {
t.Run("generate", func(t *testing.T) {
viper.Set(commonflags.GenerateKey, true)
actual, err := getOrGenerate()
actual, err := getOrGenerate(testCmd)
require.NoError(t, err)
require.NotNil(t, actual)
for _, p := range []*keys.PrivateKey{nep2Key, rawKey, wifKey, acc1.PrivateKey(), acc2.PrivateKey()} {
@ -107,13 +114,13 @@ func Test_getOrGenerate(t *testing.T) {
func checkKeyError(t *testing.T, desc string, err error) {
viper.Set(commonflags.WalletPath, desc)
_, actualErr := getOrGenerate()
_, actualErr := getOrGenerate(testCmd)
require.ErrorIs(t, actualErr, err)
}
func checkKey(t *testing.T, desc string, expected *keys.PrivateKey) {
viper.Set(commonflags.WalletPath, desc)
actual, err := getOrGenerate()
actual, err := getOrGenerate(testCmd)
require.NoError(t, err)
require.Equal(t, &expected.PrivateKey, actual)
}

View file

@ -20,12 +20,12 @@ var errCantGenerateKey = errors.New("can't generate new private key")
// Ideally we want to touch file-system on the last step.
// This function assumes that all flags were bind to viper in a `PersistentPreRun`.
func Get(cmd *cobra.Command) *ecdsa.PrivateKey {
pk, err := get()
pk, err := get(cmd)
common.ExitOnErr(cmd, "can't fetch private key: %w", err)
return pk
}
func get() (*ecdsa.PrivateKey, error) {
func get(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
keyDesc := viper.GetString(commonflags.WalletPath)
data, err := os.ReadFile(keyDesc)
if err != nil {
@ -36,7 +36,7 @@ func get() (*ecdsa.PrivateKey, error) {
if err != nil {
w, err := wallet.NewWalletFromFile(keyDesc)
if err == nil {
return FromWallet(w, viper.GetString(commonflags.Account))
return FromWallet(cmd, w, viper.GetString(commonflags.Account))
}
return nil, fmt.Errorf("%w: %v", ErrInvalidKey, err)
}
@ -45,12 +45,12 @@ func get() (*ecdsa.PrivateKey, error) {
// GetOrGenerate is similar to get but generates a new key if commonflags.GenerateKey is set.
func GetOrGenerate(cmd *cobra.Command) *ecdsa.PrivateKey {
pk, err := getOrGenerate()
pk, err := getOrGenerate(cmd)
common.ExitOnErr(cmd, "can't fetch private key: %w", err)
return pk
}
func getOrGenerate() (*ecdsa.PrivateKey, error) {
func getOrGenerate(cmd *cobra.Command) (*ecdsa.PrivateKey, error) {
if viper.GetBool(commonflags.GenerateKey) {
priv, err := keys.NewPrivateKey()
if err != nil {
@ -58,5 +58,5 @@ func getOrGenerate() (*ecdsa.PrivateKey, error) {
}
return &priv.PrivateKey, nil
}
return get()
return get(cmd)
}

View file

@ -3,13 +3,14 @@ package key
import (
"crypto/ecdsa"
"errors"
"fmt"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@ -22,37 +23,37 @@ var (
)
// FromWallet returns private key of the wallet account.
func FromWallet(w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) {
func FromWallet(cmd *cobra.Command, w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, error) {
var (
addr util.Uint160
err error
)
if addrStr == "" {
printVerbose("Using default wallet address")
common.PrintVerbose(cmd, "Using default wallet address")
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(addrStr)
if err != nil {
printVerbose("Can't parse address: %s", addrStr)
common.PrintVerbose(cmd, "Can't parse address: %s", addrStr)
return nil, ErrInvalidAddress
}
}
acc := w.GetAccount(addr)
if acc == nil {
printVerbose("Can't find wallet account for %s", addrStr)
common.PrintVerbose(cmd, "Can't find wallet account for %s", addrStr)
return nil, ErrInvalidAddress
}
pass, err := getPassword()
if err != nil {
printVerbose("Can't read password: %v", err)
common.PrintVerbose(cmd, "Can't read password: %v", err)
return nil, ErrInvalidPassword
}
if err := acc.Decrypt(pass, keys.NEP2ScryptParams()); err != nil {
printVerbose("Can't decrypt account: %v", err)
common.PrintVerbose(cmd, "Can't decrypt account: %v", err)
return nil, ErrInvalidPassword
}
@ -67,9 +68,3 @@ func getPassword() (string, error) {
return input.ReadPassword("Enter password > ")
}
func printVerbose(format string, a ...interface{}) {
if viper.GetBool("verbose") {
fmt.Printf(format+"\n", a...)
}
}

View file

@ -71,7 +71,7 @@ func createToken(cmd *cobra.Command, _ []string) {
defer cancel()
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
common.ExitOnErr(cmd, "can't fetch current epoch: %w", err)
if iatRelative {

View file

@ -36,7 +36,7 @@ var createContainerCmd = &cobra.Command{
Long: `Create new container and register it in the NeoFS.
It will be stored in sidechain when inner ring will accepts it.`,
Run: func(cmd *cobra.Command, args []string) {
placementPolicy, err := parseContainerPolicy(containerPolicy)
placementPolicy, err := parseContainerPolicy(cmd, containerPolicy)
common.ExitOnErr(cmd, "", err)
key := key.Get(cmd)
@ -165,10 +165,10 @@ func initContainerCreateCmd() {
"Skip placement validity check")
}
func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error) {
func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.PlacementPolicy, error) {
_, err := os.Stat(policyString) // check if `policyString` is a path to file with placement policy
if err == nil {
common.PrintVerbose("Reading placement policy from file: %s", policyString)
common.PrintVerbose(cmd, "Reading placement policy from file: %s", policyString)
data, err := os.ReadFile(policyString)
if err != nil {
@ -182,12 +182,12 @@ func parseContainerPolicy(policyString string) (*netmap.PlacementPolicy, error)
err = result.DecodeString(policyString)
if err == nil {
common.PrintVerbose("Parsed QL encoded policy")
common.PrintVerbose(cmd, "Parsed QL encoded policy")
return &result, nil
}
if err = result.UnmarshalJSON([]byte(policyString)); err == nil {
common.PrintVerbose("Parsed JSON encoded policy")
common.PrintVerbose(cmd, "Parsed JSON encoded policy")
return &result, nil
}

View file

@ -27,7 +27,7 @@ Only owner of the container has a permission to remove container.`,
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
if force, _ := cmd.Flags().GetBool(commonflags.ForceFlag); !force {
common.PrintVerbose("Reading the container to check ownership...")
common.PrintVerbose(cmd, "Reading the container to check ownership...")
var getPrm internalclient.GetContainerPrm
getPrm.SetClient(cli)
@ -39,13 +39,13 @@ Only owner of the container has a permission to remove container.`,
owner := resGet.Container().Owner()
if tok != nil {
common.PrintVerbose("Checking session issuer...")
common.PrintVerbose(cmd, "Checking session issuer...")
if !tok.Issuer().Equals(owner) {
common.ExitOnErr(cmd, "", fmt.Errorf("session issuer differs with the container owner: expected %s, has %s", owner, tok.Issuer()))
}
} else {
common.PrintVerbose("Checking provided account...")
common.PrintVerbose(cmd, "Checking provided account...")
var acc user.ID
user.IDFromKey(&acc, pk.PublicKey)
@ -55,10 +55,10 @@ Only owner of the container has a permission to remove container.`,
}
}
common.PrintVerbose("Account matches the container owner.")
common.PrintVerbose(cmd, "Account matches the container owner.")
if tok != nil {
common.PrintVerbose("Skip searching for LOCK objects - session provided.")
common.PrintVerbose(cmd, "Skip searching for LOCK objects - session provided.")
} else {
fs := objectSDK.NewSearchFilters()
fs.AddTypeFilter(objectSDK.MatchStringEqual, objectSDK.TypeLock)
@ -69,7 +69,7 @@ Only owner of the container has a permission to remove container.`,
searchPrm.SetFilters(fs)
searchPrm.SetTTL(2)
common.PrintVerbose("Searching for LOCK objects...")
common.PrintVerbose(cmd, "Searching for LOCK objects...")
res, err := internalclient.SearchObjects(searchPrm)
common.ExitOnErr(cmd, "can't search for LOCK objects: %w", err)

View file

@ -1,9 +1,7 @@
package container
import (
"bytes"
"crypto/ecdsa"
"encoding/json"
"os"
internalclient "github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
@ -77,18 +75,7 @@ func (x *stringWriter) WriteString(s string) (n int, err error) {
func prettyPrintContainer(cmd *cobra.Command, cnr container.Container, jsonEncoding bool) {
if jsonEncoding {
data, err := cnr.MarshalJSON()
if err != nil {
common.PrintVerbose("Can't convert container to json: %w", err)
return
}
buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil {
common.PrintVerbose("Can't pretty print json: %w", err)
}
cmd.Println(buf)
common.PrettyPrintJSON(cmd, cnr, "container")
return
}

View file

@ -36,22 +36,22 @@ func parseContainerID(cmd *cobra.Command) cid.ID {
// decodes session.Container from the file by path provided in
// commonflags.SessionToken flag. Returns nil if the path is not specified.
func getSession(cmd *cobra.Command) *session.Container {
common.PrintVerbose("Reading container session...")
common.PrintVerbose(cmd, "Reading container session...")
path, _ := cmd.Flags().GetString(commonflags.SessionToken)
if path == "" {
common.PrintVerbose("Session not provided.")
common.PrintVerbose(cmd, "Session not provided.")
return nil
}
common.PrintVerbose("Reading container session from the file [%s]...", path)
common.PrintVerbose(cmd, "Reading container session from the file [%s]...", path)
var res session.Container
err := common.ReadBinaryOrJSON(&res, path)
err := common.ReadBinaryOrJSON(cmd, &res, path)
common.ExitOnErr(cmd, "read container session: %v", err)
common.PrintVerbose("Session successfully read.")
common.PrintVerbose(cmd, "Session successfully read.")
return &res
}

View file

@ -51,7 +51,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
printIgnoreForce := func(st control.NetmapStatus) {
if force {
common.PrintVerbose("Ignore --%s flag for %s state.", commonflags.ForceFlag, st)
common.PrintVerbose(cmd, "Ignore --%s flag for %s state.", commonflags.ForceFlag, st)
}
}
@ -69,7 +69,7 @@ func setNetmapStatus(cmd *cobra.Command, _ []string) {
if force {
body.SetForceMaintenance()
common.PrintVerbose("Local maintenance will be forced.")
common.PrintVerbose(cmd, "Local maintenance will be forced.")
}
}

View file

@ -60,13 +60,13 @@ var objectLockCmd = &cobra.Command{
endpoint, _ := cmd.Flags().GetString(commonflags.RPC)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, endpoint)
currEpoch, err := internalclient.GetCurrentEpoch(ctx, cmd, endpoint)
common.ExitOnErr(cmd, "Request current epoch: %w", err)
exp = currEpoch + lifetime
}
common.PrintVerbose("Lock object will expire after %d epoch", exp)
common.PrintVerbose(cmd, "Lock object will expire after %d epoch", exp)
var expirationAttr objectSDK.Attribute
expirationAttr.SetKey(objectV2.SysAttributeExpEpoch)

View file

@ -46,7 +46,7 @@ func InitBearer(cmd *cobra.Command) {
// Prepare prepares object-related parameters for a command.
func Prepare(cmd *cobra.Command, prms ...RPCParameters) {
ttl := viper.GetUint32(commonflags.TTL)
common.PrintVerbose("TTL: %d", ttl)
common.PrintVerbose(cmd, "TTL: %d", ttl)
for i := range prms {
btok := common.ReadBearerToken(cmd, bearerTokenFlag)
@ -127,19 +127,19 @@ func readSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.PrivateKey, cnr
// decodes session.Object from the file by path specified in the
// commonflags.SessionToken flag. Returns nil if flag is not set.
func getSession(cmd *cobra.Command) *session.Object {
common.PrintVerbose("Trying to read session from the file...")
common.PrintVerbose(cmd, "Trying to read session from the file...")
path, _ := cmd.Flags().GetString(commonflags.SessionToken)
if path == "" {
common.PrintVerbose("File with session token is not provided.")
common.PrintVerbose(cmd, "File with session token is not provided.")
return nil
}
common.PrintVerbose("Reading session from the file [%s]...", path)
common.PrintVerbose(cmd, "Reading session from the file [%s]...", path)
var tok session.Object
err := common.ReadBinaryOrJSON(&tok, path)
err := common.ReadBinaryOrJSON(cmd, &tok, path)
common.ExitOnErr(cmd, "read session: %v", err)
return &tok
@ -185,7 +185,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
return
}
common.PrintVerbose("Checking session correctness...")
common.PrintVerbose(cmd, "Checking session correctness...")
switch false {
case tok.AssertContainer(cnr):
@ -200,7 +200,7 @@ func _readVerifiedSession(cmd *cobra.Command, dst SessionPrm, key *ecdsa.Private
common.ExitOnErr(cmd, "", errors.New("invalid signature of the session data"))
}
common.PrintVerbose("Session is correct.")
common.PrintVerbose(cmd, "Session is correct.")
dst.SetSessionToken(tok)
}
@ -227,7 +227,7 @@ func ReadOrOpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.
objs = []oid.ID{*obj}
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...")
common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
objs = append(objs, collectObjectRelatives(cmd, cli, cnr, *obj)...)
}
@ -259,7 +259,7 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
if obj != nil {
if _, ok := dst.(*internal.DeleteObjectPrm); ok {
common.PrintVerbose("Collecting relatives of the removal object...")
common.PrintVerbose(cmd, "Collecting relatives of the removal object...")
rels := collectObjectRelatives(cmd, cli, cnr, *obj)
@ -275,12 +275,12 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
const sessionLifetime = 10 // in NeoFS epochs
common.PrintVerbose("Opening remote session with the node...")
common.PrintVerbose(cmd, "Opening remote session with the node...")
err := sessionCli.CreateSession(&tok, cli, sessionLifetime)
common.ExitOnErr(cmd, "open remote session: %w", err)
common.PrintVerbose("Session successfully opened.")
common.PrintVerbose(cmd, "Session successfully opened.")
finalizeSession(cmd, dst, &tok, key, cnr, objs...)
@ -297,33 +297,33 @@ func OpenSessionViaClient(cmd *cobra.Command, dst SessionPrm, cli *client.Client
// *internal.PutObjectPrm
// *internal.DeleteObjectPrm
func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, key *ecdsa.PrivateKey, cnr cid.ID, objs ...oid.ID) {
common.PrintVerbose("Finalizing session token...")
common.PrintVerbose(cmd, "Finalizing session token...")
switch dst.(type) {
default:
panic(fmt.Sprintf("unsupported op parameters %T", dst))
case *internal.PutObjectPrm:
common.PrintVerbose("Binding session to object PUT...")
common.PrintVerbose(cmd, "Binding session to object PUT...")
tok.ForVerb(session.VerbObjectPut)
case *internal.DeleteObjectPrm:
common.PrintVerbose("Binding session to object DELETE...")
common.PrintVerbose(cmd, "Binding session to object DELETE...")
tok.ForVerb(session.VerbObjectDelete)
}
common.PrintVerbose("Binding session to container %s...", cnr)
common.PrintVerbose(cmd, "Binding session to container %s...", cnr)
tok.BindContainer(cnr)
if len(objs) > 0 {
common.PrintVerbose("Limiting session by the objects %v...", objs)
common.PrintVerbose(cmd, "Limiting session by the objects %v...", objs)
tok.LimitByObjects(objs...)
}
common.PrintVerbose("Signing session...")
common.PrintVerbose(cmd, "Signing session...")
err := tok.Sign(*key)
common.ExitOnErr(cmd, "sign session: %w", err)
common.PrintVerbose("Session token successfully formed and attached to the request.")
common.PrintVerbose(cmd, "Session token successfully formed and attached to the request.")
dst.SetSessionToken(tok)
}
@ -339,7 +339,7 @@ func initFlagSession(cmd *cobra.Command, verb string) {
//
// The object itself is not included in the result.
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
common.PrintVerbose("Fetching raw object header...")
common.PrintVerbose(cmd, "Fetching raw object header...")
// request raw header first
var addrObj oid.Address
@ -361,10 +361,10 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
default:
common.ExitOnErr(cmd, "failed to get raw object header: %w", err)
case err == nil:
common.PrintVerbose("Raw header received - object is singular.")
common.PrintVerbose(cmd, "Raw header received - object is singular.")
return nil
case errors.As(err, &errSplit):
common.PrintVerbose("Split information received - object is virtual.")
common.PrintVerbose(cmd, "Split information received - object is virtual.")
}
splitInfo := errSplit.SplitInfo()
@ -373,7 +373,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok {
common.PrintVerbose("Collecting split members using linking object %s...", idLinking)
common.PrintVerbose(cmd, "Collecting split members using linking object %s...", idLinking)
addrObj.SetObject(idLinking)
prmHead.SetAddress(addrObj)
@ -381,18 +381,22 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// client is already set
res, err := internal.HeadObject(prmHead)
common.ExitOnErr(cmd, "failed to get linking object's header: %w", err)
if err == nil {
children := res.Header().Children()
common.PrintVerbose("Received split members from the linking object: %v", children)
common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
// include linking object
return append(children, idLinking)
}
// linking object is not required for
// object collecting
common.PrintVerbose(cmd, "failed to get linking object's header: %w", err)
}
if idSplit := splitInfo.SplitID(); idSplit != nil {
common.PrintVerbose("Collecting split members by split ID...")
common.PrintVerbose(cmd, "Collecting split members by split ID...")
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, idSplit)
@ -407,7 +411,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
members := res.IDList()
common.PrintVerbose("Found objects by split ID: %v", res.IDList())
common.PrintVerbose(cmd, "Found objects by split ID: %v", res.IDList())
return members
}
@ -417,7 +421,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.ExitOnErr(cmd, "", errors.New("missing any data in received object split information"))
}
common.PrintVerbose("Traverse the object split chain in reverse...", idMember)
common.PrintVerbose(cmd, "Traverse the object split chain in reverse...", idMember)
var res *internal.HeadObjectRes
chain := []oid.ID{idMember}
@ -427,7 +431,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
// split members are almost definitely singular, but don't get hung up on it
for {
common.PrintVerbose("Reading previous element of the split chain member %s...", idMember)
common.PrintVerbose(cmd, "Reading previous element of the split chain member %s...", idMember)
addrObj.SetObject(idMember)
@ -436,7 +440,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
idMember, ok = res.Header().PreviousID()
if !ok {
common.PrintVerbose("Chain ended.")
common.PrintVerbose(cmd, "Chain ended.")
break
}
@ -448,7 +452,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
chainSet[idMember] = struct{}{}
}
common.PrintVerbose("Looking for a linking object...")
common.PrintVerbose(cmd, "Looking for a linking object...")
var query object.SearchFilters
query.AddParentIDFilter(object.MatchStringEqual, obj)
@ -465,7 +469,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
for i := range list {
if _, ok = chainSet[list[i]]; !ok {
common.PrintVerbose("Found one more related object %s.", list[i])
common.PrintVerbose(cmd, "Found one more related object %s.", list[i])
chain = append(chain, list[i])
}
}

View file

@ -118,6 +118,6 @@ func initConfig() {
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
common.PrintVerbose("Using config file: %s", viper.ConfigFileUsed())
common.PrintVerbose(rootCmd, "Using config file: %s", viper.ConfigFileUsed())
}
}

View file

@ -54,7 +54,7 @@ func createSession(cmd *cobra.Command, _ []string) {
addrStr, _ := cmd.Flags().GetString(commonflags.RPC)
common.ExitOnErr(cmd, "can't parse endpoint: %w", netAddr.FromString(addrStr))
c, err := internalclient.GetSDKClient(privKey, netAddr)
c, err := internalclient.GetSDKClient(cmd, privKey, netAddr)
common.ExitOnErr(cmd, "can't create client: %w", err)
lifetime := uint64(defaultLifetime)

View file

@ -83,7 +83,7 @@ func addByPath(cmd *cobra.Command, _ []string) {
nn := resp.GetBody().GetNodes()
if len(nn) == 0 {
common.PrintVerbose("No new nodes were created")
common.PrintVerbose(cmd, "No new nodes were created")
return
}

View file

@ -80,7 +80,7 @@ func getByPath(cmd *cobra.Command, _ []string) {
nn := resp.GetBody().GetNodes()
if len(nn) == 0 {
common.PrintVerbose("The node is not found")
common.PrintVerbose(cmd, "The node is not found")
return
}

View file

@ -1,8 +1,6 @@
package util
import (
"bytes"
"encoding/json"
"os"
"github.com/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
@ -45,7 +43,7 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
}
if len(to) == 0 {
prettyPrintJSON(cmd, data)
common.PrettyPrintJSON(cmd, table, "eACL")
return
}
@ -54,12 +52,3 @@ func convertEACLTable(cmd *cobra.Command, _ []string) {
cmd.Printf("extended ACL table was successfully dumped to %s\n", to)
}
func prettyPrintJSON(cmd *cobra.Command, data []byte) {
buf := new(bytes.Buffer)
if err := json.Indent(buf, data, "", " "); err != nil {
common.PrintVerbose("Can't pretty print json: %w", err)
}
cmd.Println(buf)
}

View file

@ -51,8 +51,7 @@ func signBearerToken(cmd *cobra.Command, _ []string) {
}
if len(to) == 0 {
prettyPrintJSON(cmd, data)
common.PrettyPrintJSON(cmd, btok, "bearer token")
return
}

View file

@ -52,7 +52,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
new(session.Object),
new(session.Container),
} {
errLast = common.ReadBinaryOrJSON(el, fPath)
errLast = common.ReadBinaryOrJSON(cmd, el, fPath)
if errLast == nil {
stok = el
break
@ -71,7 +71,7 @@ func signSessionToken(cmd *cobra.Command, _ []string) {
to := cmd.Flag(signToFlag).Value.String()
if len(to) == 0 {
prettyPrintJSON(cmd, data)
common.PrettyPrintJSON(cmd, stok, "session token")
return
}

View file

@ -215,7 +215,8 @@ func (s *lruNetmapSource) Epoch() (uint64, error) {
// wrapper over TTL cache of values read from the network
// that implements container lister.
type ttlContainerLister struct {
*ttlNetCache[string, *cacheItemContainerList]
inner *ttlNetCache[string, *cacheItemContainerList]
client *cntClient.Client
}
// value type for ttlNetCache used by ttlContainerLister.
@ -251,20 +252,18 @@ func newCachedContainerLister(c *cntClient.Client, ttl time.Duration) ttlContain
}, nil
})
return ttlContainerLister{lruCnrListerCache}
return ttlContainerLister{inner: lruCnrListerCache, client: c}
}
// List returns list of container IDs from the cache. If list is missing in the
// cache or expired, then it returns container IDs from side chain and updates
// the cache.
func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
var str string
if id != nil {
str = id.EncodeToString()
if id == nil {
return s.client.List(nil)
}
item, err := s.get(str)
item, err := s.inner.get(id.EncodeToString())
if err != nil {
return nil, err
}
@ -287,13 +286,17 @@ func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
strOwner := owner.EncodeToString()
val, ok := s.cache.Get(strOwner)
val, ok := s.inner.cache.Peek(strOwner)
if !ok {
// we could cache the single cnr but in this case we will disperse
// with the Sidechain a lot
return
}
if s.inner.ttl <= time.Since(val.t) {
return
}
item := val.v
item.mtx.Lock()

View file

@ -127,7 +127,6 @@ func initContainerService(c *cfg) {
cnrRdr.get = c.cfgObject.cnrSource
cnrWrt.cacheEnabled = true
cnrWrt.lists = cachedContainerLister
cnrWrt.eacls = cachedEACLStorage
}
@ -658,7 +657,6 @@ type morphContainerWriter struct {
cacheEnabled bool
eacls ttlEACLStorage
lists ttlContainerLister
}
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {

View file

@ -41,8 +41,8 @@ type boltDBCfg struct {
boltOptions *bbolt.Options
}
func defaultCfg(с *cfg) {
*с = cfg{
func defaultCfg(c *cfg) {
*c = cfg{
boltDBCfg: boltDBCfg{
perm: os.ModePerm, // 0777
boltOptions: &bbolt.Options{

View file

@ -6,6 +6,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -45,11 +46,14 @@ func (b *Blobovniczas) Put(prm common.PutPrm) (common.PutRes, error) {
}
if _, err := active.blz.Put(putPrm); err != nil {
// check if blobovnicza is full
if errors.Is(err, blobovnicza.ErrFull) {
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
// or update active blobovnicza in other thread. In the latter case the database will be closed
// and `updateActive` takes care of not updating the active blobovnicza twice.
if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) {
if isFull {
b.log.Debug("blobovnicza overflowed",
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))),
)
zap.String("path", filepath.Join(p, u64ToHexString(active.ind))))
}
if err := b.updateActive(p, &active.ind); err != nil {
if !isLogical(err) {

View file

@ -239,15 +239,17 @@ func (t *FSTree) Put(prm common.PutPrm) (common.PutRes, error) {
prm.RawData = t.Compress(prm.RawData)
}
err := t.writeFile(p, prm.RawData)
tmpPath := p + "#"
err := t.writeFile(tmpPath, prm.RawData)
if err != nil {
var pe *fs.PathError
if errors.As(err, &pe) && pe.Err == syscall.ENOSPC {
err = common.ErrNoSpace
_ = os.RemoveAll(tmpPath)
}
}
return common.PutRes{StorageID: []byte{}}, err
return common.PutRes{StorageID: []byte{}}, os.Rename(tmpPath, p)
}
func (t *FSTree) writeFlags() int {

View file

@ -82,6 +82,8 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error)
return EvacuateShardRes{}, errMustHaveTwoShards
}
e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList))
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
@ -185,5 +187,7 @@ mainLoop:
}
}
e.log.Info("finished shards evacuation",
zap.Strings("shard_ids", sidList))
return res, nil
}

View file

@ -65,8 +65,12 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool := e.shardPools[sh.ID().String()]
pool, ok := e.shardPools[sh.ID().String()]
e.mtx.RUnlock()
if !ok {
// Shard was concurrently removed, skip.
return false
}
putDone, exists := e.putToShard(sh, ind, pool, addr, prm.obj)
finished = putDone || exists

View file

@ -12,7 +12,7 @@ import (
var _ pilorama.Forest = (*StorageEngine)(nil)
// TreeMove implements the pilorama.Forest interface.
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
index, lst, err := e.getTreeShard(d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err
@ -32,7 +32,7 @@ func (e *StorageEngine) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pil
}
// TreeAddByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.LogMove, error) {
func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, m []pilorama.KeyValue) ([]pilorama.Move, error) {
index, lst, err := e.getTreeShard(d.CID, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return nil, err

View file

@ -0,0 +1,50 @@
package pilorama
import (
"sort"
"sync"
"time"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"go.etcd.io/bbolt"
)
type batch struct {
forest *boltForest
timer *time.Timer
mtx sync.Mutex
start sync.Once
cid cidSDK.ID
treeID string
results []chan<- error
operations []*Move
}
func (b *batch) trigger() {
b.mtx.Lock()
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.mtx.Unlock()
b.start.Do(b.run)
}
func (b *batch) run() {
sort.Slice(b.operations, func(i, j int) bool {
return b.operations[i].Time < b.operations[j].Time
})
fullID := bucketName(b.cid, b.treeID)
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
var lm Move
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
})
for i := range b.operations {
b.results[i] <- err
}
}

View file

@ -12,6 +12,7 @@ import (
"time"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"github.com/TrueCloudLab/frostfs-node/pkg/util"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -21,8 +22,13 @@ import (
type boltForest struct {
db *bbolt.DB
modeMtx sync.Mutex
modeMtx sync.RWMutex
mode mode.Mode
// mtx protects batches field.
mtx sync.Mutex
batches []*batch
cfg
}
@ -31,6 +37,12 @@ var (
logBucket = []byte{1}
)
// ErrDegradedMode is returned when pilorama is in a degraded mode.
var ErrDegradedMode = logicerr.New("pilorama is in a degraded mode")
// ErrReadOnlyMode is returned when pilorama is in a read-only mode.
var ErrReadOnlyMode = logicerr.New("pilorama is in a read-only mode")
// NewBoltForest returns storage wrapper for storing operations on CRDT trees.
//
// Each tree is stored in a separate bucket by `CID + treeID` key.
@ -71,12 +83,9 @@ func (t *boltForest) SetMode(m mode.Mode) error {
if t.mode == m {
return nil
}
if t.mode.ReadOnly() == m.ReadOnly() {
return nil
}
err := t.Close()
if err == nil {
if err == nil && !m.NoMetabase() {
if err = t.Open(m.ReadOnly()); err == nil {
err = t.Init()
}
@ -110,7 +119,7 @@ func (t *boltForest) Open(readOnly bool) error {
return nil
}
func (t *boltForest) Init() error {
if t.db.IsReadOnly() {
if t.mode.NoMetabase() || t.db.IsReadOnly() {
return nil
}
return t.db.Update(func(tx *bbolt.Tx) error {
@ -133,29 +142,45 @@ func (t *boltForest) Close() error {
}
// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) {
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error) {
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
var lm LogMove
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
lm := *m
fullID := bucketName(d.CID, treeID)
return &lm, t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
m.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
if m.Child == RootID {
m.Child = t.findSpareID(bTree)
lm.Time = t.getLatestTimestamp(bLog, d.Position, d.Size)
if lm.Child == RootID {
lm.Child = t.findSpareID(bTree)
}
lm.Move = *m
return t.applyOperation(bLog, bTree, &lm)
return t.do(bLog, bTree, make([]byte, 17), &lm)
})
}
// TreeExists implements the Forest interface.
func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return false, ErrDegradedMode
}
var exists bool
err := t.db.View(func(tx *bbolt.Tx) error {
@ -168,7 +193,7 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
}
// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) {
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
@ -176,11 +201,21 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
return nil, ErrNotPathAttribute
}
var lm []LogMove
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
} else if t.mode.ReadOnly() {
return nil, ErrReadOnlyMode
}
var lm []Move
var key [17]byte
fullID := bucketName(d.CID, treeID)
err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
@ -191,9 +226,9 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
}
ts := t.getLatestTimestamp(bLog, d.Position, d.Size)
lm = make([]LogMove, len(path)-i+1)
lm = make([]Move, len(path)-i+1)
for j := i; j < len(path); j++ {
lm[j-i].Move = Move{
lm[j-i] = Move{
Parent: node,
Meta: Meta{
Time: ts,
@ -211,7 +246,7 @@ func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string,
node = lm[j-i].Child
}
lm[len(lm)-1].Move = Move{
lm[len(lm)-1] = Move{
Parent: node,
Meta: Meta{
Time: ts,
@ -240,17 +275,14 @@ func (t *boltForest) getLatestTimestamp(bLog *bbolt.Bucket, pos, size int) uint6
// findSpareID returns random unused ID.
func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
id := uint64(rand.Int63())
var key [9]byte
key[0] = 't'
binary.LittleEndian.PutUint64(key[1:], id)
key := make([]byte, 9)
for {
if bTree.Get(key[:]) == nil {
_, _, _, ok := t.getState(bTree, stateKey(key, id))
if !ok {
return id
}
id = uint64(rand.Int63())
binary.LittleEndian.PutUint64(key[1:], id)
}
}
@ -260,6 +292,15 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
return ErrInvalidCIDDescriptor
}
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
if backgroundSync {
var seen bool
err := t.db.View(func(tx *bbolt.Tx) error {
@ -280,19 +321,68 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
}
}
return t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if t.db.MaxBatchSize == 1 {
fullID := bucketName(d.CID, treeID)
return t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
lm := &LogMove{Move: *m}
return t.applyOperation(bLog, bTree, lm)
var lm Move
return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
})
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
treeRoot := bucketName(cid, treeID)
ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch)
return <-ch
}
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock()
if t.batches[i].timer == nil {
t.batches[i].mtx.Unlock()
copy(t.batches[i:], t.batches[i+1:])
t.batches = t.batches[:len(t.batches)-1]
i--
continue
}
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
if found {
t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m)
if len(t.batches[i].operations) == t.db.MaxBatchSize {
t.batches[i].timer.Stop()
t.batches[i].timer = nil
t.batches[i].mtx.Unlock()
b := t.batches[i]
t.mtx.Unlock()
b.trigger()
return
}
t.batches[i].mtx.Unlock()
t.mtx.Unlock()
return
}
t.batches[i].mtx.Unlock()
}
b := &batch{
forest: t,
cid: d.CID,
treeID: treeID,
results: []chan<- error{ch},
operations: []*Move{m},
}
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
t.batches = append(t.batches, b)
t.mtx.Unlock()
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, treeRoot []byte) (*bbolt.Bucket, *bbolt.Bucket, error) {
child := tx.Bucket(treeRoot)
if child != nil {
return child.Bucket(logBucket), child.Bucket(dataBucket), nil
@ -313,16 +403,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
return bLog, bData, nil
}
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
var tmp LogMove
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *Move) error {
var tmp Move
var cKey [17]byte
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], lm.Time)
if logBucket.Get(logKey[:]) != nil {
return nil
}
c := logBucket.Cursor()
key, value := c.Last()
@ -331,82 +416,87 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
tmp.Child = r.ReadU64LE()
tmp.Parent = r.ReadU64LE()
tmp.Time = r.ReadVarUint()
if r.Err != nil {
return r.Err
}
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
return err
}
key, value = c.Prev()
}
key, _ = c.Next()
for i := 0; i < len(ms); i++ {
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
*lm = *ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err
}
}
if key == nil {
// The operation is inserted in the beginning, reposition the cursor.
// Otherwise, `Next` call will return currently inserted operation.
c.First()
}
key, value = c.Seek(key)
// Cursor can be invalid, seek again.
binary.BigEndian.PutUint64(cKey[:], lm.Time)
_, _ = c.Seek(cKey[:8])
key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
if err := t.logFromBytes(&tmp, value); err != nil {
return err
}
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
if err := t.redo(treeBucket, cKey[:], &tmp, value[16:]); err != nil {
return err
}
key, value = c.Next()
}
}
return nil
}
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {
currParent := b.Get(parentKey(key, op.Child))
op.HasOld = currParent != nil
if currParent != nil { // node is already in tree
op.Old.Parent = binary.LittleEndian.Uint64(currParent)
if err := op.Old.Meta.FromBytes(b.Get(metaKey(key, op.Child))); err != nil {
return err
}
} else {
op.HasOld = false
op.Old = nodeInfo{}
}
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *Move) error {
binary.BigEndian.PutUint64(key, op.Time)
if err := lb.Put(key[:8], t.logToBytes(op)); err != nil {
rawLog := t.logToBytes(op)
if err := lb.Put(key[:8], rawLog); err != nil {
return err
}
if op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
return nil
return t.redo(b, key, op, rawLog[16:])
}
if currParent == nil {
if err := b.Put(timestampKey(key, op.Child), toUint64(op.Time)); err != nil {
return err
}
func (t *boltForest) redo(b *bbolt.Bucket, key []byte, op *Move, rawMeta []byte) error {
var err error
parent, ts, currMeta, inTree := t.getState(b, stateKey(key, op.Child))
if inTree {
err = t.putState(b, oldKey(key, op.Time), parent, ts, currMeta)
} else {
parent := binary.LittleEndian.Uint64(currParent)
ts = op.Time
err = b.Delete(oldKey(key, op.Time))
}
if err != nil || op.Child == op.Parent || t.isAncestor(b, op.Child, op.Parent) {
return err
}
if inTree {
if err := b.Delete(childrenKey(key, op.Child, parent)); err != nil {
return err
}
for i := range op.Old.Meta.Items {
if isAttributeInternal(op.Old.Meta.Items[i].Key) {
key = internalKey(key, op.Old.Meta.Items[i].Key, string(op.Old.Meta.Items[i].Value), parent, op.Child)
var meta Meta
if err := meta.FromBytes(currMeta); err != nil {
return err
}
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
key = internalKey(key, meta.Items[i].Key, string(meta.Items[i].Value), parent, op.Child)
err := b.Delete(key)
if err != nil {
return err
@ -414,17 +504,16 @@ func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMo
}
}
}
return t.addNode(b, key, op.Child, op.Parent, op.Meta)
return t.addNode(b, key, op.Child, op.Parent, ts, op.Meta, rawMeta)
}
// removeNode removes node keys from the tree except the children key or its parent.
func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node) error {
if err := b.Delete(parentKey(key, node)); err != nil {
return err
}
k := stateKey(key, node)
_, _, rawMeta, _ := t.getState(b, k)
var meta Meta
var k = metaKey(key, node)
if err := meta.FromBytes(b.Get(k)); err == nil {
if err := meta.FromBytes(rawMeta); err == nil {
for i := range meta.Items {
if isAttributeInternal(meta.Items[i].Key) {
err := b.Delete(internalKey(nil, meta.Items[i].Key, string(meta.Items[i].Value), parent, node))
@ -434,23 +523,16 @@ func (t *boltForest) removeNode(b *bbolt.Bucket, key []byte, node, parent Node)
}
}
}
if err := b.Delete(metaKey(key, node)); err != nil {
return err
}
return b.Delete(timestampKey(key, node))
return b.Delete(k)
}
// addNode adds node keys to the tree except the timestamp key.
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, meta Meta) error {
err := b.Put(parentKey(key, child), toUint64(parent))
if err != nil {
func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, time Timestamp, meta Meta, rawMeta []byte) error {
if err := t.putState(b, stateKey(key, child), parent, time, rawMeta); err != nil {
return err
}
err = b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil {
return err
}
err = b.Put(metaKey(key, child), meta.Bytes())
err := b.Put(childrenKey(key, child, parent), []byte{1})
if err != nil {
return err
}
@ -473,27 +555,33 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me
return nil
}
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
func (t *boltForest) undo(m *Move, b *bbolt.Bucket, key []byte) error {
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
return err
}
if !lm.HasOld {
parent, ts, rawMeta, ok := t.getState(b, oldKey(key, m.Time))
if !ok {
return t.removeNode(b, key, m.Child, m.Parent)
}
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
var meta Meta
if err := meta.FromBytes(rawMeta); err != nil {
return err
}
return t.addNode(b, key, m.Child, parent, ts, meta, rawMeta)
}
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
key := make([]byte, 9)
key[0] = 'p'
key[0] = 's'
for node := child; node != parent; {
binary.LittleEndian.PutUint64(key[1:], node)
rawParent := b.Get(key)
if len(rawParent) != 8 {
parent, _, _, ok := t.getState(b, key)
if !ok {
return false
}
node = binary.LittleEndian.Uint64(rawParent)
node = parent
}
return true
}
@ -508,6 +596,13 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
return nil, nil
}
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var nodes []Node
return nodes, t.db.View(func(tx *bbolt.Tx) error {
@ -526,10 +621,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
return nil
}
var (
childID [9]byte
maxTimestamp uint64
)
var maxTimestamp uint64
c := b.Cursor()
@ -539,7 +631,7 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
for len(childKey) == len(attrKey)+8 && bytes.Equal(attrKey, childKey[:len(childKey)-8]) {
child := binary.LittleEndian.Uint64(childKey[len(childKey)-8:])
if latest {
ts := binary.LittleEndian.Uint64(b.Get(timestampKey(childID[:], child)))
_, ts, _, _ := t.getState(b, stateKey(make([]byte, 9), child))
if ts >= maxTimestamp {
nodes = append(nodes[:0], child)
maxTimestamp = ts
@ -555,7 +647,14 @@ func (t *boltForest) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, pa
// TreeGetMeta implements the forest interface.
func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Meta, Node, error) {
key := parentKey(make([]byte, 9), nodeID)
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Meta{}, 0, ErrDegradedMode
}
key := stateKey(make([]byte, 9), nodeID)
var m Meta
var parentID uint64
@ -567,10 +666,11 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
}
b := treeRoot.Bucket(dataBucket)
if data := b.Get(key); len(data) == 8 {
if data := b.Get(key); len(data) != 0 {
parentID = binary.LittleEndian.Uint64(data)
}
return m.FromBytes(b.Get(metaKey(key, nodeID)))
_, _, meta, _ := t.getState(b, stateKey(key, nodeID))
return m.FromBytes(meta)
})
return m, parentID, err
@ -578,6 +678,13 @@ func (t *boltForest) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID Node) (Met
// TreeGetChildren implements the Forest interface.
func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node) ([]uint64, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
key := make([]byte, 9)
key[0] = 'c'
binary.LittleEndian.PutUint64(key[1:], nodeID)
@ -603,8 +710,17 @@ func (t *boltForest) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID Node)
// TreeList implements the Forest interface.
func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
var ids []string
cidRaw := []byte(cid.EncodeToString())
cidRaw := make([]byte, 32)
cid.Encode(cidRaw)
cidLen := len(cidRaw)
err := t.db.View(func(tx *bbolt.Tx) error {
@ -628,6 +744,13 @@ func (t *boltForest) TreeList(cid cidSDK.ID) ([]string, error) {
// TreeGetOpLog implements the pilorama.Forest interface.
func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (Move, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return Move{}, ErrDegradedMode
}
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, height)
@ -651,10 +774,20 @@ func (t *boltForest) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (
// TreeDrop implements the pilorama.Forest interface.
func (t *boltForest) TreeDrop(cid cidSDK.ID, treeID string) error {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
return t.db.Batch(func(tx *bbolt.Tx) error {
if treeID == "" {
c := tx.Cursor()
prefix := []byte(cid.EncodeToString())
prefix := make([]byte, 32)
cid.Encode(prefix)
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
err := tx.DeleteBucket(k)
if err != nil {
@ -698,67 +831,72 @@ loop:
}
func (t *boltForest) moveFromBytes(m *Move, data []byte) error {
r := io.NewBinReaderFromBuf(data)
m.Child = r.ReadU64LE()
m.Parent = r.ReadU64LE()
m.Meta.DecodeBinary(r)
return r.Err
return t.logFromBytes(m, data)
}
func (t *boltForest) logFromBytes(lm *LogMove, r *io.BinReader) error {
lm.Child = r.ReadU64LE()
lm.Parent = r.ReadU64LE()
lm.Meta.DecodeBinary(r)
lm.HasOld = r.ReadBool()
if lm.HasOld {
lm.Old.Parent = r.ReadU64LE()
lm.Old.Meta.DecodeBinary(r)
}
return r.Err
func (t *boltForest) logFromBytes(lm *Move, data []byte) error {
lm.Child = binary.LittleEndian.Uint64(data)
lm.Parent = binary.LittleEndian.Uint64(data[8:])
return lm.Meta.FromBytes(data[16:])
}
func (t *boltForest) logToBytes(lm *LogMove) []byte {
func (t *boltForest) logToBytes(lm *Move) []byte {
w := io.NewBufBinWriter()
size := 8 + 8 + lm.Meta.Size() + 1
if lm.HasOld {
size += 8 + lm.Old.Meta.Size()
}
//if lm.HasOld {
// size += 8 + lm.Old.Meta.Size()
//}
w.Grow(size)
w.WriteU64LE(lm.Child)
w.WriteU64LE(lm.Parent)
lm.Meta.EncodeBinary(w.BinWriter)
w.WriteBool(lm.HasOld)
if lm.HasOld {
w.WriteU64LE(lm.Old.Parent)
lm.Old.Meta.EncodeBinary(w.BinWriter)
}
//w.WriteBool(lm.HasOld)
//if lm.HasOld {
// w.WriteU64LE(lm.Old.Parent)
// lm.Old.Meta.EncodeBinary(w.BinWriter)
//}
return w.Bytes()
}
func bucketName(cid cidSDK.ID, treeID string) []byte {
return []byte(cid.String() + treeID)
treeRoot := make([]byte, 32+len(treeID))
cid.Encode(treeRoot)
copy(treeRoot[32:], treeID)
return treeRoot
}
// 't' + node (id) -> timestamp when the node first appeared.
func timestampKey(key []byte, child Node) []byte {
key[0] = 't'
// 'o' + time -> old meta.
func oldKey(key []byte, ts Timestamp) []byte {
key[0] = 'o'
binary.LittleEndian.PutUint64(key[1:], ts)
return key[:9]
}
// 's' + child ID -> parent + timestamp of the first appearance + meta.
func stateKey(key []byte, child Node) []byte {
key[0] = 's'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
}
// 'p' + node (id) -> parent (id).
func parentKey(key []byte, child Node) []byte {
key[0] = 'p'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
func (t *boltForest) putState(b *bbolt.Bucket, key []byte, parent Node, timestamp Timestamp, meta []byte) error {
data := make([]byte, len(meta)+8+8)
binary.LittleEndian.PutUint64(data, parent)
binary.LittleEndian.PutUint64(data[8:], timestamp)
copy(data[16:], meta)
return b.Put(key, data)
}
// 'm' + node (id) -> serialized meta.
func metaKey(key []byte, child Node) []byte {
key[0] = 'm'
binary.LittleEndian.PutUint64(key[1:], child)
return key[:9]
func (t *boltForest) getState(b *bbolt.Bucket, key []byte) (Node, Timestamp, []byte, bool) {
data := b.Get(key)
if data == nil {
return 0, 0, nil, false
}
parent := binary.LittleEndian.Uint64(data)
timestamp := binary.LittleEndian.Uint64(data[8:])
return parent, timestamp, data[16:], true
}
// 'c' + parent (id) + child (id) -> 0/1.

View file

@ -25,7 +25,7 @@ func NewMemoryForest() ForestStorage {
}
// TreeMove implements the Forest interface.
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogMove, error) {
func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*Move, error) {
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
@ -44,11 +44,11 @@ func (f *memoryForest) TreeMove(d CIDDescriptor, treeID string, op *Move) (*LogM
lm := s.do(op)
s.operations = append(s.operations, lm)
return &lm, nil
return &lm.Move, nil
}
// TreeAddByPath implements the Forest interface.
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]LogMove, error) {
func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, m []KeyValue) ([]Move, error) {
if !d.checkValid() {
return nil, ErrInvalidCIDDescriptor
}
@ -64,22 +64,23 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}
i, node := s.getPathPrefix(attr, path)
lm := make([]LogMove, len(path)-i+1)
lm := make([]Move, len(path)-i+1)
for j := i; j < len(path); j++ {
lm[j-i] = s.do(&Move{
op := s.do(&Move{
Parent: node,
Meta: Meta{
Time: s.timestamp(d.Position, d.Size),
Items: []KeyValue{{Key: attr, Value: []byte(path[j])}}},
Child: s.findSpareID(),
})
node = lm[j-i].Child
s.operations = append(s.operations, lm[j-i])
lm[j-i] = op.Move
node = op.Child
s.operations = append(s.operations, op)
}
mCopy := make([]KeyValue, len(m))
copy(mCopy, m)
lm[len(lm)-1] = s.do(&Move{
op := s.do(&Move{
Parent: node,
Meta: Meta{
Time: s.timestamp(d.Position, d.Size),
@ -87,6 +88,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
},
Child: s.findSpareID(),
})
lm[len(lm)-1] = op.Move
return lm, nil
}

View file

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync"
"testing"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
@ -16,9 +17,9 @@ import (
var providers = []struct {
name string
construct func(t testing.TB) Forest
construct func(t testing.TB, opts ...Option) Forest
}{
{"inmemory", func(t testing.TB) Forest {
{"inmemory", func(t testing.TB, _ ...Option) Forest {
f := NewMemoryForest()
require.NoError(t, f.Open(false))
require.NoError(t, f.Init())
@ -28,14 +29,15 @@ var providers = []struct {
return f
}},
{"bbolt", func(t testing.TB) Forest {
{"bbolt", func(t testing.TB, opts ...Option) Forest {
// Use `os.TempDir` because we construct multiple times in the same test.
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
require.NoError(t, err)
f := NewBoltForest(
append([]Option{
WithPath(filepath.Join(tmpDir, "test.db")),
WithMaxBatchSize(1))
WithMaxBatchSize(1)}, opts...)...)
require.NoError(t, f.Open(false))
require.NoError(t, f.Init())
t.Cleanup(func() {
@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) {
}
}
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) {
}
}
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) {
}
}
func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) {
s := constructor(t)
checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) {
@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) {
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) {
rand.Seed(42)
const (
nodeCount = 5
opCount = 20
iterCount = 200
)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
func TestForest_ParallelApply(t *testing.T) {
for i := range providers {
if providers[i].name == "inmemory" {
continue
}
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10)
})
}
}
// prepareRandomTree creates a random sequence of operation and applies them to tree.
// The operations are guaranteed to be applied and returned sorted by `Time`.
func prepareRandomTree(nodeCount, opCount int) []Move {
ops := make([]Move, nodeCount+opCount)
for i := 0; i < nodeCount; i++ {
ops[i] = Move{
@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
for i := nodeCount; i < len(ops); i++ {
ops[i] = Move{
Parent: rand.Uint64() % (nodeCount + 12),
Parent: rand.Uint64() % uint64(nodeCount+12),
Meta: Meta{
Time: Timestamp(i + nodeCount),
Items: []KeyValue{
@ -694,26 +696,19 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
{Value: make([]byte, 10)},
},
},
Child: rand.Uint64() % (nodeCount + 10),
Child: rand.Uint64() % uint64(nodeCount+10),
}
if rand.Uint32()%5 == 0 {
ops[i].Parent = TrashID
}
rand.Read(ops[i].Meta.Items[1].Value)
}
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
return ops
}
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
}
for i := uint64(0); i < nodeCount; i++ {
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ {
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
@ -742,17 +737,96 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
}
}
}
func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) {
rand.Seed(42)
const nodeCount = 5
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t, WithMaxBatchSize(batchSize))
wg := new(sync.WaitGroup)
ch := make(chan *Move, 0)
for i := 0; i < batchSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for op := range ch {
require.NoError(t, actual.TreeApply(d, treeID, op, false))
}
}()
}
for i := range ops {
ch <- &ops[i]
}
close(ch)
wg.Wait()
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
rand.Seed(42)
const (
nodeCount = 5
opCount = 20
)
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
const iterCount = 200
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
}
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
}
const benchNodeCount = 1000
var batchSizes = []int{1, 2, 4, 8, 16, 32}
func BenchmarkApplySequential(b *testing.B) {
for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
for _, bs := range batchSizes {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
s := providers[i].construct(b, WithMaxBatchSize(bs))
benchmarkApply(b, s, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
@ -768,6 +842,8 @@ func BenchmarkApplySequential(b *testing.B) {
})
})
}
})
}
}
func BenchmarkApplyReorderLast(b *testing.B) {
@ -780,7 +856,10 @@ func BenchmarkApplyReorderLast(b *testing.B) {
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
for _, bs := range batchSizes {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
s := providers[i].construct(b, WithMaxBatchSize(bs))
benchmarkApply(b, s, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
@ -801,6 +880,8 @@ func BenchmarkApplyReorderLast(b *testing.B) {
})
})
}
})
}
}
func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
ch := make(chan *Move, b.N)
for i := range ops {
ch <- &ops[i]
ch := make(chan int, b.N)
for i := 0; i < b.N; i++ {
ch <- i
}
b.ResetTimer()
b.ReportAllocs()
b.SetParallelism(50)
b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
op := <-ch
if err := s.TreeApply(d, treeID, op, false); err != nil {
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}

View file

@ -6,9 +6,15 @@ type nodeInfo struct {
Meta Meta
}
type move struct {
Move
HasOld bool
Old nodeInfo
}
// state represents state being replicated.
type state struct {
operations []LogMove
operations []move
tree
}
@ -20,7 +26,7 @@ func newState() *state {
}
// undo un-does op and changes s in-place.
func (s *state) undo(op *LogMove) {
func (s *state) undo(op *move) {
children := s.tree.childMap[op.Parent]
for i := range children {
if children[i] == op.Child {
@ -76,8 +82,8 @@ func (s *state) Apply(op *Move) error {
}
// do performs a single move operation on a tree.
func (s *state) do(op *Move) LogMove {
lm := LogMove{
func (s *state) do(op *Move) move {
lm := move{
Move: Move{
Parent: op.Parent,
Meta: op.Meta,

View file

@ -11,11 +11,11 @@ type Forest interface {
// TreeMove moves node in the tree.
// If the parent of the move operation is TrashID, the node is removed.
// If the child of the move operation is RootID, new ID is generated and added to a tree.
TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error)
TreeMove(d CIDDescriptor, treeID string, m *Move) (*Move, error)
// TreeAddByPath adds new node in the tree using provided path.
// The path is constructed by descending from the root using the values of the attr in meta.
// Internal nodes in path should have exactly one attribute, otherwise a new node is created.
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error)
// TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists.
TreeApply(d CIDDescriptor, treeID string, m *Move, backgroundSync bool) error

View file

@ -35,13 +35,6 @@ type Move struct {
Child Node
}
// LogMove represents log record for a single move operation.
type LogMove struct {
Move
HasOld bool
Old nodeInfo
}
const (
// RootID represents the ID of a root node.
RootID = 0

View file

@ -261,7 +261,10 @@ func (s *Shard) Close() error {
}
}
// If Init/Open was unsuccessful gc can be nil.
if s.gc != nil {
s.gc.stop()
}
return nil
}

View file

@ -91,11 +91,15 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
}, err
}
// emptyStorageID is an empty storageID that indicates that
// an object is big (and is stored in an FSTree, not in a blobovnicza).
var emptyStorageID = make([]byte, 0)
// fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
var (
err error
res *objectSDK.Object
mErr error
mRes meta.ExistsRes
)
var exists bool
@ -103,15 +107,15 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
var mPrm meta.ExistsPrm
mPrm.SetAddress(addr)
mRes, err := s.metaBase.Exists(mPrm)
if err != nil && !s.info.Mode.NoMetabase() {
return res, false, err
mRes, mErr = s.metaBase.Exists(mPrm)
if mErr != nil && !s.info.Mode.NoMetabase() {
return nil, false, mErr
}
exists = mRes.Exists()
}
if s.hasWriteCache() {
res, err = wc(s.writeCache)
res, err := wc(s.writeCache)
if err == nil || IsErrOutOfRange(err) {
return res, false, err
}
@ -123,8 +127,8 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
}
}
if skipMeta || err != nil {
res, err = cb(s.blobStor, nil)
if skipMeta || mErr != nil {
res, err := cb(s.blobStor, nil)
return res, false, err
}
@ -135,12 +139,20 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher,
var mPrm meta.StorageIDPrm
mPrm.SetAddress(addr)
mRes, err := s.metaBase.StorageID(mPrm)
mExRes, err := s.metaBase.StorageID(mPrm)
if err != nil {
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
}
res, err = cb(s.blobStor, mRes.StorageID())
storageID := mExRes.StorageID()
if storageID == nil {
// `nil` storageID returned without any error
// means that object is big, `cb` expects an
// empty (but non-nil) storageID in such cases
storageID = emptyStorageID
}
res, err := cb(s.blobStor, storageID)
return res, true, err
}

View file

@ -3,6 +3,7 @@ package shard
import (
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"go.uber.org/zap"
)
// ErrReadOnlyMode is returned when it is impossible to apply operation
@ -24,6 +25,10 @@ func (s *Shard) SetMode(m mode.Mode) error {
}
func (s *Shard) setMode(m mode.Mode) error {
s.log.Info("setting shard mode",
zap.Stringer("old_mode", s.info.Mode),
zap.Stringer("new_mode", m))
components := []interface{ SetMode(mode.Mode) error }{
s.metaBase, s.blobStor,
}
@ -61,6 +66,8 @@ func (s *Shard) setMode(m mode.Mode) error {
s.metricsWriter.SetReadonly(s.info.Mode != mode.ReadWrite)
}
s.log.Info("shard mode set successfully",
zap.Stringer("mode", s.info.Mode))
return nil
}

View file

@ -12,7 +12,7 @@ var _ pilorama.Forest = (*Shard)(nil)
var ErrPiloramaDisabled = logicerr.New("pilorama is disabled")
// TreeMove implements the pilorama.Forest interface.
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.LogMove, error) {
func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) (*pilorama.Move, error) {
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
@ -27,7 +27,7 @@ func (s *Shard) TreeMove(d pilorama.CIDDescriptor, treeID string, m *pilorama.Mo
}
// TreeAddByPath implements the pilorama.Forest interface.
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.LogMove, error) {
func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr string, path []string, meta []pilorama.KeyValue) ([]pilorama.Move, error) {
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}

View file

@ -11,6 +11,7 @@ import (
clientcore "github.com/TrueCloudLab/frostfs-node/pkg/core/client"
"github.com/TrueCloudLab/frostfs-node/pkg/network"
"github.com/TrueCloudLab/frostfs-sdk-go/client"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
)
type singleClient struct {
@ -29,8 +30,6 @@ type multiClient struct {
addr network.AddressGroup
opts ClientCacheOpts
reconnectInterval time.Duration
}
const defaultReconnectInterval = time.Second * 30
@ -43,7 +42,6 @@ func newMultiClient(addr network.AddressGroup, opts ClientCacheOpts) *multiClien
clients: make(map[string]*singleClient),
addr: addr,
opts: opts,
reconnectInterval: defaultReconnectInterval,
}
}
@ -149,8 +147,12 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
err = f(c)
}
success := err == nil || errors.Is(err, context.Canceled)
// non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
var siErr *object.SplitInfoError
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
firstErr = err
}
@ -170,6 +172,14 @@ func (x *multiClient) ReportError(err error) {
return
}
// non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
var siErr *object.SplitInfoError
if errors.As(err, &siErr) {
return
}
// Dropping all clients here is not necessary, we do this
// because `multiClient` doesn't yet provide convenient interface
// for reporting individual errors for streaming operations.
@ -327,7 +337,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
c.RUnlock()
return cl, nil
}
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
c.RUnlock()
return nil, errRecentlyFailed
}
@ -350,7 +360,7 @@ func (x *multiClient) client(addr network.Address) (clientcore.Client, error) {
return c.client, nil
}
if x.reconnectInterval != 0 && time.Since(c.lastAttempt) < x.reconnectInterval {
if x.opts.ReconnectTimeout != 0 && time.Since(c.lastAttempt) < x.opts.ReconnectTimeout {
return nil, errRecentlyFailed
}

View file

@ -10,6 +10,7 @@ import (
"github.com/TrueCloudLab/frostfs-node/pkg/core/netmap"
"github.com/TrueCloudLab/frostfs-node/pkg/services/object"
"github.com/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
@ -573,6 +574,9 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
if err != nil {
return info, errors.New("can't fetch current epoch")
}
if req.token.ExpiredAt(currentEpoch) {
return info, apistatus.SessionTokenExpired{}
}
if req.token.InvalidAt(currentEpoch) {
return info, fmt.Errorf("%s: token is invalid at %d epoch)",
invalidRequestMessage, currentEpoch)

View file

@ -6,9 +6,9 @@ import (
"go.uber.org/zap"
)
// LogServiceError writes debug error message of object service to provided logger.
// LogServiceError writes error message of object service to provided logger.
func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, err error) {
l.Debug("object service error",
l.Error("object service error",
zap.String("node", network.StringifyGroup(node)),
zap.String("request", req),
zap.String("error", err.Error()),
@ -17,7 +17,7 @@ func LogServiceError(l *logger.Logger, req string, node network.AddressGroup, er
// LogWorkerPoolError writes debug error message of object worker pool to provided logger.
func LogWorkerPoolError(l *logger.Logger, req string, err error) {
l.Debug("could not push task to worker pool",
l.Error("could not push task to worker pool",
zap.String("request", req),
zap.String("error", err.Error()),
)

View file

@ -17,7 +17,7 @@ import (
type movePair struct {
cid cidSDK.ID
treeID string
op *pilorama.LogMove
op *pilorama.Move
}
type replicationTask struct {
@ -25,12 +25,33 @@ type replicationTask struct {
req *ApplyRequest
}
type applyOp struct {
treeID string
pilorama.CIDDescriptor
pilorama.Move
}
const (
defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5
)
func (s *Service) localReplicationWorker() {
for {
select {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
if err != nil {
s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error()))
}
}
}
}
func (s *Service) replicationWorker() {
for {
select {
@ -74,6 +95,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ {
go s.replicationWorker()
go s.localReplicationWorker()
}
defer func() {
for len(s.replicationTasks) != 0 {
@ -119,14 +141,14 @@ func (s *Service) replicate(op movePair) error {
return nil
}
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove) {
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
select {
case s.replicateCh <- movePair{
cid: cid,
treeID: treeID,
op: op,
}:
case <-s.closeCh:
default:
}
}

View file

@ -23,6 +23,7 @@ type Service struct {
cache clientCache
replicateCh chan movePair
replicateLocalCh chan applyOp
replicationTasks chan replicationTask
closeCh chan struct{}
containerCache containerCache
@ -59,6 +60,7 @@ func New(opts ...Option) *Service {
s.cache.init()
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
@ -483,13 +485,19 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
return nil, fmt.Errorf("can't parse meta-information: %w", err)
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
select {
case s.replicateLocalCh <- applyOp{
treeID: req.GetBody().GetTreeId(),
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
Move: pilorama.Move{
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,
}, false)
},
}:
default:
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {

View file

@ -360,7 +360,7 @@ func (s *Service) syncLoop(ctx context.Context) {
}
// randomizeNodeOrder shuffles nodes and removes not a `pos` index.
// It is assumed that 0 <= pos < len(nodes)
// It is assumed that 0 <= pos < len(nodes).
func randomizeNodeOrder(cnrNodes []netmap.NodeInfo, pos int) []netmap.NodeInfo {
if len(cnrNodes) == 1 {
return nil