forked from TrueCloudLab/frostfs-node
Compare commits
11 commits
8c0714cd40
...
f1556e3c42
Author | SHA1 | Date | |
---|---|---|---|
f1556e3c42 | |||
e122ff6013 | |||
e2658c7519 | |||
c00f4bab18 | |||
46fef276b4 | |||
9bd05e94c8 | |||
16830033f8 | |||
1cf51a8079 | |||
3324c26fd8 | |||
a692298533 | |||
be2753de00 |
34 changed files with 316 additions and 230 deletions
3
Makefile
3
Makefile
|
@ -282,7 +282,6 @@ env-up: all
|
||||||
|
|
||||||
# Shutdown dev environment
|
# Shutdown dev environment
|
||||||
env-down:
|
env-down:
|
||||||
docker compose -f dev/docker-compose.yml down
|
docker compose -f dev/docker-compose.yml down -v
|
||||||
docker volume rm -f frostfs-node_neo-go
|
|
||||||
rm -rf ./$(TMP_DIR)/state
|
rm -rf ./$(TMP_DIR)/state
|
||||||
rm -rf ./$(TMP_DIR)/storage
|
rm -rf ./$(TMP_DIR)/storage
|
||||||
|
|
|
@ -98,7 +98,7 @@ See `frostfs-contract`'s README.md for build instructions.
|
||||||
4. To create container and put object into it run (container and object IDs will be different):
|
4. To create container and put object into it run (container and object IDs will be different):
|
||||||
|
|
||||||
```
|
```
|
||||||
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --basic-acl public-read-write --await
|
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --await
|
||||||
Enter password > <- press ENTER, the is no password for wallet
|
Enter password > <- press ENTER, the is no password for wallet
|
||||||
CID: CfPhEuHQ2PRvM4gfBQDC4dWZY3NccovyfcnEdiq2ixju
|
CID: CfPhEuHQ2PRvM4gfBQDC4dWZY3NccovyfcnEdiq2ixju
|
||||||
|
|
||||||
|
|
|
@ -72,4 +72,3 @@ All other `object` sub-commands support only static sessions (2).
|
||||||
List of commands supporting sessions (static only):
|
List of commands supporting sessions (static only):
|
||||||
- `create`
|
- `create`
|
||||||
- `delete`
|
- `delete`
|
||||||
- `set-eacl`
|
|
||||||
|
|
|
@ -15,14 +15,12 @@ import (
|
||||||
containerApi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
containerApi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
containerACL string
|
|
||||||
containerPolicy string
|
containerPolicy string
|
||||||
containerAttributes []string
|
containerAttributes []string
|
||||||
containerAwait bool
|
containerAwait bool
|
||||||
|
@ -89,9 +87,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
|
||||||
err = parseAttributes(&cnr, containerAttributes)
|
err = parseAttributes(&cnr, containerAttributes)
|
||||||
commonCmd.ExitOnErr(cmd, "", err)
|
commonCmd.ExitOnErr(cmd, "", err)
|
||||||
|
|
||||||
var basicACL acl.Basic
|
|
||||||
commonCmd.ExitOnErr(cmd, "decode basic ACL string: %w", basicACL.DecodeString(containerACL))
|
|
||||||
|
|
||||||
tok := getSession(cmd)
|
tok := getSession(cmd)
|
||||||
|
|
||||||
if tok != nil {
|
if tok != nil {
|
||||||
|
@ -105,7 +100,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
|
||||||
}
|
}
|
||||||
|
|
||||||
cnr.SetPlacementPolicy(*placementPolicy)
|
cnr.SetPlacementPolicy(*placementPolicy)
|
||||||
cnr.SetBasicACL(basicACL)
|
|
||||||
|
|
||||||
var syncContainerPrm internalclient.SyncContainerPrm
|
var syncContainerPrm internalclient.SyncContainerPrm
|
||||||
syncContainerPrm.SetClient(cli)
|
syncContainerPrm.SetClient(cli)
|
||||||
|
@ -163,10 +157,6 @@ func initContainerCreateCmd() {
|
||||||
flags.DurationP(commonflags.Timeout, commonflags.TimeoutShorthand, commonflags.TimeoutDefault, commonflags.TimeoutUsage)
|
flags.DurationP(commonflags.Timeout, commonflags.TimeoutShorthand, commonflags.TimeoutDefault, commonflags.TimeoutUsage)
|
||||||
flags.StringP(commonflags.WalletPath, commonflags.WalletPathShorthand, commonflags.WalletPathDefault, commonflags.WalletPathUsage)
|
flags.StringP(commonflags.WalletPath, commonflags.WalletPathShorthand, commonflags.WalletPathDefault, commonflags.WalletPathUsage)
|
||||||
flags.StringP(commonflags.Account, commonflags.AccountShorthand, commonflags.AccountDefault, commonflags.AccountUsage)
|
flags.StringP(commonflags.Account, commonflags.AccountShorthand, commonflags.AccountDefault, commonflags.AccountUsage)
|
||||||
|
|
||||||
flags.StringVar(&containerACL, "basic-acl", acl.NamePrivate, fmt.Sprintf("HEX encoded basic ACL value or keywords like '%s', '%s', '%s'",
|
|
||||||
acl.NamePublicRW, acl.NamePrivate, acl.NamePublicROExtended,
|
|
||||||
))
|
|
||||||
flags.StringVarP(&containerPolicy, "policy", "p", "", "QL-encoded or JSON-encoded placement policy or path to file with it")
|
flags.StringVarP(&containerPolicy, "policy", "p", "", "QL-encoded or JSON-encoded placement policy or path to file with it")
|
||||||
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
||||||
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
||||||
|
|
|
@ -196,31 +196,6 @@ func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error
|
||||||
return s.delInfoCache.get(cnr)
|
return s.delInfoCache.get(cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ttlEACLStorage struct {
|
|
||||||
*ttlNetCache[cid.ID, *container.EACL]
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStorage {
|
|
||||||
const eaclCacheSize = 100
|
|
||||||
|
|
||||||
lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) {
|
|
||||||
return v.GetEACL(id)
|
|
||||||
}, metrics.NewCacheMetrics("eacl"))
|
|
||||||
|
|
||||||
return ttlEACLStorage{lruCnrCache}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetEACL returns eACL value from the cache. If value is missing in the cache
|
|
||||||
// or expired, then it returns value from side chain and updates cache.
|
|
||||||
func (s ttlEACLStorage) GetEACL(cnr cid.ID) (*container.EACL, error) {
|
|
||||||
return s.get(cnr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// InvalidateEACL removes cached eACL value.
|
|
||||||
func (s ttlEACLStorage) InvalidateEACL(cnr cid.ID) {
|
|
||||||
s.remove(cnr)
|
|
||||||
}
|
|
||||||
|
|
||||||
type lruNetmapSource struct {
|
type lruNetmapSource struct {
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
|
||||||
|
|
|
@ -642,8 +642,6 @@ type cfgObject struct {
|
||||||
|
|
||||||
cnrSource container.Source
|
cnrSource container.Source
|
||||||
|
|
||||||
eaclSource container.EACLSource
|
|
||||||
|
|
||||||
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
||||||
|
|
||||||
pool cfgObjectRoutines
|
pool cfgObjectRoutines
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||||
|
@ -24,6 +25,7 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
|
||||||
Service: "frostfs-node",
|
Service: "frostfs-node",
|
||||||
InstanceID: getInstanceIDOrDefault(c),
|
InstanceID: getInstanceIDOrDefault(c),
|
||||||
Version: misc.Version,
|
Version: misc.Version,
|
||||||
|
Attributes: make(map[string]string),
|
||||||
}
|
}
|
||||||
|
|
||||||
if trustedCa := config.StringSafe(c.Sub(subsection), "trusted_ca"); trustedCa != "" {
|
if trustedCa := config.StringSafe(c.Sub(subsection), "trusted_ca"); trustedCa != "" {
|
||||||
|
@ -38,11 +40,30 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
|
||||||
}
|
}
|
||||||
conf.ServerCaCertPool = certPool
|
conf.ServerCaCertPool = certPool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
i := uint64(0)
|
||||||
|
for ; ; i++ {
|
||||||
|
si := strconv.FormatUint(i, 10)
|
||||||
|
ac := c.Sub(subsection).Sub("attributes").Sub(si)
|
||||||
|
k := config.StringSafe(ac, "key")
|
||||||
|
if k == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
v := config.StringSafe(ac, "value")
|
||||||
|
if v == "" {
|
||||||
|
return nil, fmt.Errorf("empty tracing attribute value for key %s", k)
|
||||||
|
}
|
||||||
|
if _, ok := conf.Attributes[k]; ok {
|
||||||
|
return nil, fmt.Errorf("tracing attribute key %s defined more than once", k)
|
||||||
|
}
|
||||||
|
conf.Attributes[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
return conf, nil
|
return conf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getInstanceIDOrDefault(c *config.Config) string {
|
func getInstanceIDOrDefault(c *config.Config) string {
|
||||||
s := config.StringSlice(c.Sub("node"), "addresses")
|
s := config.StringSliceSafe(c.Sub("node"), "addresses")
|
||||||
if len(s) > 0 {
|
if len(s) > 0 {
|
||||||
return s[0]
|
return s[0]
|
||||||
}
|
}
|
||||||
|
|
46
cmd/frostfs-node/config/tracing/config_test.go
Normal file
46
cmd/frostfs-node/config/tracing/config_test.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package tracing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||||
|
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTracingSection(t *testing.T) {
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
tc, err := ToTracingConfig(configtest.EmptyConfig())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, false, tc.Enabled)
|
||||||
|
require.Equal(t, tracing.Exporter(""), tc.Exporter)
|
||||||
|
require.Equal(t, "", tc.Endpoint)
|
||||||
|
require.Equal(t, "frostfs-node", tc.Service)
|
||||||
|
require.Equal(t, "", tc.InstanceID)
|
||||||
|
require.Nil(t, tc.ServerCaCertPool)
|
||||||
|
require.Empty(t, tc.Attributes)
|
||||||
|
})
|
||||||
|
|
||||||
|
const path = "../../../../config/example/node"
|
||||||
|
|
||||||
|
fileConfigTest := func(c *config.Config) {
|
||||||
|
tc, err := ToTracingConfig(c)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, true, tc.Enabled)
|
||||||
|
require.Equal(t, tracing.OTLPgRPCExporter, tc.Exporter)
|
||||||
|
require.Equal(t, "localhost", tc.Endpoint)
|
||||||
|
require.Equal(t, "frostfs-node", tc.Service)
|
||||||
|
require.Nil(t, tc.ServerCaCertPool)
|
||||||
|
require.EqualValues(t, map[string]string{
|
||||||
|
"key0": "value",
|
||||||
|
"key1": "value",
|
||||||
|
}, tc.Attributes)
|
||||||
|
}
|
||||||
|
|
||||||
|
configtest.ForEachFileType(path, fileConfigTest)
|
||||||
|
|
||||||
|
t.Run("ENV", func(t *testing.T) {
|
||||||
|
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||||
|
})
|
||||||
|
}
|
|
@ -10,6 +10,8 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
subsection = "tree"
|
subsection = "tree"
|
||||||
|
|
||||||
|
SyncBatchSizeDefault = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
// TreeConfig is a wrapper over "tree" config section
|
// TreeConfig is a wrapper over "tree" config section
|
||||||
|
@ -74,6 +76,17 @@ func (c TreeConfig) SyncInterval() time.Duration {
|
||||||
return config.DurationSafe(c.cfg, "sync_interval")
|
return config.DurationSafe(c.cfg, "sync_interval")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncBatchSize returns the value of "sync_batch_size"
|
||||||
|
// config parameter from the "tree" section.
|
||||||
|
//
|
||||||
|
// Returns `SyncBatchSizeDefault` if config value is not specified.
|
||||||
|
func (c TreeConfig) SyncBatchSize() int {
|
||||||
|
if v := config.IntSafe(c.cfg, "sync_batch_size"); v > 0 {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
return SyncBatchSizeDefault
|
||||||
|
}
|
||||||
|
|
||||||
// AuthorizedKeys parses and returns an array of "authorized_keys" config
|
// AuthorizedKeys parses and returns an array of "authorized_keys" config
|
||||||
// parameter from "tree" section.
|
// parameter from "tree" section.
|
||||||
//
|
//
|
||||||
|
|
|
@ -44,6 +44,7 @@ func TestTreeSection(t *testing.T) {
|
||||||
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
|
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
|
||||||
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
|
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
|
||||||
require.Equal(t, time.Hour, treeSec.SyncInterval())
|
require.Equal(t, time.Hour, treeSec.SyncInterval())
|
||||||
|
require.Equal(t, 2000, treeSec.SyncBatchSize())
|
||||||
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
|
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,10 +73,6 @@ func initContainerService(_ context.Context, c *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||||
eACLFetcher := &morphEACLFetcher{
|
|
||||||
w: client,
|
|
||||||
}
|
|
||||||
|
|
||||||
cnrRdr := new(morphContainerReader)
|
cnrRdr := new(morphContainerReader)
|
||||||
|
|
||||||
cnrWrt := &morphContainerWriter{
|
cnrWrt := &morphContainerWriter{
|
||||||
|
@ -84,8 +80,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cfgMorph.cacheTTL <= 0 {
|
if c.cfgMorph.cacheTTL <= 0 {
|
||||||
c.cfgObject.eaclSource = eACLFetcher
|
|
||||||
cnrRdr.eacl = eACLFetcher
|
|
||||||
c.cfgObject.cnrSource = cnrSrc
|
c.cfgObject.cnrSource = cnrSrc
|
||||||
cnrRdr.src = cnrSrc
|
cnrRdr.src = cnrSrc
|
||||||
cnrRdr.lister = client
|
cnrRdr.lister = client
|
||||||
|
@ -129,11 +123,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
c.cfgObject.cnrSource = containerCache
|
c.cfgObject.cnrSource = containerCache
|
||||||
}
|
}
|
||||||
|
|
||||||
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
|
||||||
c.cfgObject.eaclSource = cachedEACLStorage
|
|
||||||
|
|
||||||
cnrRdr.lister = client
|
cnrRdr.lister = client
|
||||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
|
||||||
cnrRdr.src = c.cfgObject.cnrSource
|
cnrRdr.src = c.cfgObject.cnrSource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,8 +214,6 @@ func (c *cfg) ExternalAddresses() []string {
|
||||||
|
|
||||||
// implements interface required by container service provided by morph executor.
|
// implements interface required by container service provided by morph executor.
|
||||||
type morphContainerReader struct {
|
type morphContainerReader struct {
|
||||||
eacl containerCore.EACLSource
|
|
||||||
|
|
||||||
src containerCore.Source
|
src containerCore.Source
|
||||||
|
|
||||||
lister interface {
|
lister interface {
|
||||||
|
@ -241,10 +229,6 @@ func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo,
|
||||||
return x.src.DeletionInfo(id)
|
return x.src.DeletionInfo(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
|
||||||
return x.eacl.GetEACL(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||||
return x.lister.ContainersOf(id)
|
return x.lister.ContainersOf(id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
@ -14,7 +13,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
||||||
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
|
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
|
||||||
|
@ -37,7 +35,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
|
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -484,29 +481,6 @@ func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *object
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
type morphEACLFetcher struct {
|
|
||||||
w *cntClient.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
|
|
||||||
eaclInfo, err := s.w.GetEACL(cnr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
binTable, err := eaclInfo.Value.Marshal()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("marshal eACL table: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !eaclInfo.Signature.Verify(binTable) {
|
|
||||||
// TODO(@cthulhu-rider): #468 use "const" error
|
|
||||||
return nil, errors.New("invalid signature of the eACL table")
|
|
||||||
}
|
|
||||||
|
|
||||||
return eaclInfo, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type engineWithoutNotifications struct {
|
type engineWithoutNotifications struct {
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,7 @@ func initTreeService(c *cfg) {
|
||||||
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
||||||
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
||||||
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
||||||
|
tree.WithSyncBatchSize(treeConfig.SyncBatchSize()),
|
||||||
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
||||||
tree.WithMetrics(c.metricsCollector.TreeService()),
|
tree.WithMetrics(c.metricsCollector.TreeService()),
|
||||||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||||
|
|
|
@ -31,6 +31,7 @@ FROSTFS_TREE_REPLICATION_CHANNEL_CAPACITY=32
|
||||||
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
|
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
|
||||||
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
|
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
|
||||||
FROSTFS_TREE_SYNC_INTERVAL=1h
|
FROSTFS_TREE_SYNC_INTERVAL=1h
|
||||||
|
FROSTFS_TREE_SYNC_BATCH_SIZE=2000
|
||||||
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||||
|
|
||||||
# gRPC section
|
# gRPC section
|
||||||
|
@ -202,6 +203,10 @@ FROSTFS_TRACING_ENABLED=true
|
||||||
FROSTFS_TRACING_ENDPOINT="localhost"
|
FROSTFS_TRACING_ENDPOINT="localhost"
|
||||||
FROSTFS_TRACING_EXPORTER="otlp_grpc"
|
FROSTFS_TRACING_EXPORTER="otlp_grpc"
|
||||||
FROSTFS_TRACING_TRUSTED_CA=""
|
FROSTFS_TRACING_TRUSTED_CA=""
|
||||||
|
FROSTFS_TRACING_ATTRIBUTES_0_KEY=key0
|
||||||
|
FROSTFS_TRACING_ATTRIBUTES_0_VALUE=value
|
||||||
|
FROSTFS_TRACING_ATTRIBUTES_1_KEY=key1
|
||||||
|
FROSTFS_TRACING_ATTRIBUTES_1_VALUE=value
|
||||||
|
|
||||||
FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,7 @@
|
||||||
"replication_worker_count": 32,
|
"replication_worker_count": 32,
|
||||||
"replication_timeout": "5s",
|
"replication_timeout": "5s",
|
||||||
"sync_interval": "1h",
|
"sync_interval": "1h",
|
||||||
|
"sync_batch_size": 2000,
|
||||||
"authorized_keys": [
|
"authorized_keys": [
|
||||||
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
|
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
|
||||||
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||||
|
@ -258,9 +259,19 @@
|
||||||
},
|
},
|
||||||
"tracing": {
|
"tracing": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
"endpoint": "localhost:9090",
|
"endpoint": "localhost",
|
||||||
"exporter": "otlp_grpc",
|
"exporter": "otlp_grpc",
|
||||||
"trusted_ca": "/etc/ssl/tracing.pem"
|
"trusted_ca": "",
|
||||||
|
"attributes":[
|
||||||
|
{
|
||||||
|
"key": "key0",
|
||||||
|
"value": "value"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "key1",
|
||||||
|
"value": "value"
|
||||||
|
}
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"runtime": {
|
"runtime": {
|
||||||
"soft_memory_limit": 1073741824
|
"soft_memory_limit": 1073741824
|
||||||
|
|
|
@ -59,6 +59,7 @@ tree:
|
||||||
replication_channel_capacity: 32
|
replication_channel_capacity: 32
|
||||||
replication_timeout: 5s
|
replication_timeout: 5s
|
||||||
sync_interval: 1h
|
sync_interval: 1h
|
||||||
|
sync_batch_size: 2000
|
||||||
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
|
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
|
||||||
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
|
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
|
||||||
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
|
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
|
||||||
|
@ -238,6 +239,11 @@ tracing:
|
||||||
exporter: "otlp_grpc"
|
exporter: "otlp_grpc"
|
||||||
endpoint: "localhost"
|
endpoint: "localhost"
|
||||||
trusted_ca: ""
|
trusted_ca: ""
|
||||||
|
attributes:
|
||||||
|
- key: key0
|
||||||
|
value: value
|
||||||
|
- key: key1
|
||||||
|
value: value
|
||||||
|
|
||||||
runtime:
|
runtime:
|
||||||
soft_memory_limit: 1gb
|
soft_memory_limit: 1gb
|
||||||
|
|
|
@ -78,7 +78,12 @@
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
|
||||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
|
||||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||||
|
"FROSTFS_TRACING_ENABLED":"true",
|
||||||
|
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||||
|
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8080"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
},
|
},
|
||||||
|
@ -129,7 +134,12 @@
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
|
||||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
|
||||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||||
|
"FROSTFS_TRACING_ENABLED":"true",
|
||||||
|
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||||
|
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8082"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
},
|
},
|
||||||
|
@ -180,7 +190,12 @@
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
|
||||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
|
||||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||||
|
"FROSTFS_TRACING_ENABLED":"true",
|
||||||
|
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||||
|
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8084"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
},
|
},
|
||||||
|
@ -231,7 +246,12 @@
|
||||||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
|
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
|
||||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
|
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
|
||||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||||
|
"FROSTFS_TRACING_ENABLED":"true",
|
||||||
|
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||||
|
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||||
|
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8086"
|
||||||
},
|
},
|
||||||
"postDebugTask": "env-down"
|
"postDebugTask": "env-down"
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,3 +14,15 @@ services:
|
||||||
- ./neo-go/node-wallet.json:/wallets/node-wallet.json
|
- ./neo-go/node-wallet.json:/wallets/node-wallet.json
|
||||||
- ./neo-go/config.yml:/wallets/config.yml
|
- ./neo-go/config.yml:/wallets/config.yml
|
||||||
- ./neo-go/wallet.json:/wallets/wallet.json
|
- ./neo-go/wallet.json:/wallets/wallet.json
|
||||||
|
jaeger:
|
||||||
|
image: jaegertracing/all-in-one:latest
|
||||||
|
container_name: jaeger
|
||||||
|
ports:
|
||||||
|
- '4317:4317' #OTLP over gRPC
|
||||||
|
- '4318:4318' #OTLP over HTTP
|
||||||
|
- '16686:16686' #frontend
|
||||||
|
stop_signal: SIGKILL
|
||||||
|
environment:
|
||||||
|
- COLLECTOR_OTLP_ENABLED=true
|
||||||
|
- SPAN_STORAGE_TYPE=badger
|
||||||
|
- BADGER_EPHEMERAL=true
|
||||||
|
|
|
@ -412,12 +412,12 @@ object:
|
||||||
- $attribute:ClusterName
|
- $attribute:ClusterName
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
||||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
Contains runtime parameters.
|
Contains runtime parameters.
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -7,7 +7,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -58,16 +58,3 @@ type EACL struct {
|
||||||
// Session within which Value was set. Nil means session absence.
|
// Session within which Value was set. Nil means session absence.
|
||||||
Session *session.Container
|
Session *session.Container
|
||||||
}
|
}
|
||||||
|
|
||||||
// EACLSource is the interface that wraps
|
|
||||||
// basic methods of extended ACL table source.
|
|
||||||
type EACLSource interface {
|
|
||||||
// GetEACL reads the table from the source by identifier.
|
|
||||||
// It returns any error encountered.
|
|
||||||
//
|
|
||||||
// GetEACL must return exactly one non-nil value.
|
|
||||||
//
|
|
||||||
// Must return apistatus.ErrEACLNotFound if requested
|
|
||||||
// eACL table is not in source.
|
|
||||||
GetEACL(cid.ID) (*EACL, error)
|
|
||||||
}
|
|
||||||
|
|
|
@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||||
|
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
|
||||||
|
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||||
|
e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err,
|
||||||
|
zap.Stringer("cid", cnr),
|
||||||
|
zap.String("tree", treeID),
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
||||||
|
|
|
@ -558,6 +558,80 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
m, err := t.filterSeen(cnr, treeID, m)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(m) == 0 {
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan error)
|
||||||
|
b := &batch{
|
||||||
|
forest: t,
|
||||||
|
cid: cnr,
|
||||||
|
treeID: treeID,
|
||||||
|
results: []chan<- error{ch},
|
||||||
|
operations: m,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
b.run()
|
||||||
|
}()
|
||||||
|
err = <-ch
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
|
||||||
|
t.modeMtx.RLock()
|
||||||
|
defer t.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if t.mode.NoMetabase() {
|
||||||
|
return nil, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
ops := make([]*Move, 0, len(m))
|
||||||
|
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||||
|
if treeRoot == nil {
|
||||||
|
ops = m
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b := treeRoot.Bucket(logBucket)
|
||||||
|
for _, op := range m {
|
||||||
|
var logKey [8]byte
|
||||||
|
binary.BigEndian.PutUint64(logKey[:], op.Time)
|
||||||
|
seen := b.Get(logKey[:]) != nil
|
||||||
|
if !seen {
|
||||||
|
ops = append(ops, op)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
return ops, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
||||||
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -111,6 +111,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
|
||||||
return s.Apply(op)
|
return s.Apply(op)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
|
||||||
|
for _, op := range ops {
|
||||||
|
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *memoryForest) Init() error {
|
func (f *memoryForest) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,8 @@ type Forest interface {
|
||||||
// TreeApply applies replicated operation from another node.
|
// TreeApply applies replicated operation from another node.
|
||||||
// If background is true, TreeApply will first check whether an operation exists.
|
// If background is true, TreeApply will first check whether an operation exists.
|
||||||
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||||
|
// TreeApplyBatch applies replicated operations from another node.
|
||||||
|
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
|
||||||
// TreeGetByPath returns all nodes corresponding to the path.
|
// TreeGetByPath returns all nodes corresponding to the path.
|
||||||
// The path is constructed by descending from the root using the values of the
|
// The path is constructed by descending from the root using the values of the
|
||||||
// AttributeFilename in meta.
|
// AttributeFilename in meta.
|
||||||
|
|
|
@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
|
||||||
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||||
|
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.String("container_id", cnr.EncodeToString()),
|
||||||
|
attribute.String("tree_id", treeID),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if s.pilorama == nil {
|
||||||
|
return ErrPiloramaDisabled
|
||||||
|
}
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
|
||||||
|
}
|
||||||
|
|
||||||
// TreeGetByPath implements the pilorama.Forest interface.
|
// TreeGetByPath implements the pilorama.Forest interface.
|
||||||
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
||||||
|
|
|
@ -27,7 +27,6 @@ const (
|
||||||
getMethod = "get"
|
getMethod = "get"
|
||||||
listMethod = "list"
|
listMethod = "list"
|
||||||
containersOfMethod = "containersOf"
|
containersOfMethod = "containersOf"
|
||||||
eaclMethod = "eACL"
|
|
||||||
deletionInfoMethod = "deletionInfo"
|
deletionInfoMethod = "deletionInfo"
|
||||||
|
|
||||||
// putNamedMethod is method name for container put with an alias. It is exported to provide custom fee.
|
// putNamedMethod is method name for container put with an alias. It is exported to provide custom fee.
|
||||||
|
|
|
@ -1,95 +0,0 @@
|
||||||
package container
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/sha256"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GetEACL reads the extended ACL table from FrostFS system
|
|
||||||
// through Container contract call.
|
|
||||||
//
|
|
||||||
// Returns apistatus.EACLNotFound if eACL table is missing in the contract.
|
|
||||||
func (c *Client) GetEACL(cnr cid.ID) (*container.EACL, error) {
|
|
||||||
binCnr := make([]byte, sha256.Size)
|
|
||||||
cnr.Encode(binCnr)
|
|
||||||
|
|
||||||
prm := client.TestInvokePrm{}
|
|
||||||
prm.SetMethod(eaclMethod)
|
|
||||||
prm.SetArgs(binCnr)
|
|
||||||
|
|
||||||
prms, err := c.client.TestInvoke(prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not perform test invocation (%s): %w", eaclMethod, err)
|
|
||||||
} else if ln := len(prms); ln != 1 {
|
|
||||||
return nil, fmt.Errorf("unexpected stack item count (%s): %d", eaclMethod, ln)
|
|
||||||
}
|
|
||||||
|
|
||||||
arr, err := client.ArrayFromStackItem(prms[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get item array of eACL (%s): %w", eaclMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(arr) != 4 {
|
|
||||||
return nil, fmt.Errorf("unexpected eacl stack item count (%s): %d", eaclMethod, len(arr))
|
|
||||||
}
|
|
||||||
|
|
||||||
rawEACL, err := client.BytesFromStackItem(arr[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get byte array of eACL (%s): %w", eaclMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sig, err := client.BytesFromStackItem(arr[1])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get byte array of eACL signature (%s): %w", eaclMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client may not return errors if the table is missing, so check this case additionally.
|
|
||||||
// The absence of a signature in the response can be taken as an eACL absence criterion,
|
|
||||||
// since unsigned table cannot be approved in the storage by design.
|
|
||||||
if len(sig) == 0 {
|
|
||||||
return nil, new(apistatus.EACLNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub, err := client.BytesFromStackItem(arr[2])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get byte array of eACL public key (%s): %w", eaclMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
binToken, err := client.BytesFromStackItem(arr[3])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not get byte array of eACL session token (%s): %w", eaclMethod, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var res container.EACL
|
|
||||||
|
|
||||||
res.Value = eacl.NewTable()
|
|
||||||
if err = res.Value.Unmarshal(rawEACL); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(binToken) > 0 {
|
|
||||||
res.Session = new(session.Container)
|
|
||||||
|
|
||||||
err = res.Session.Unmarshal(binToken)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not unmarshal session token: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(@cthulhu-rider): #468 implement and use another approach to avoid conversion
|
|
||||||
var sigV2 refs.Signature
|
|
||||||
sigV2.SetKey(pub)
|
|
||||||
sigV2.SetSign(sig)
|
|
||||||
sigV2.SetScheme(refs.ECDSA_RFC6979_SHA256)
|
|
||||||
|
|
||||||
err = res.Signature.ReadFromV2(sigV2)
|
|
||||||
return &res, err
|
|
||||||
}
|
|
|
@ -25,7 +25,6 @@ type morphExecutor struct {
|
||||||
// Reader is an interface of read-only container storage.
|
// Reader is an interface of read-only container storage.
|
||||||
type Reader interface {
|
type Reader interface {
|
||||||
containercore.Source
|
containercore.Source
|
||||||
containercore.EACLSource
|
|
||||||
|
|
||||||
// ContainersOf returns a list of container identifiers belonging
|
// ContainersOf returns a list of container identifiers belonging
|
||||||
// to the specified user of FrostFS system. Returns the identifiers
|
// to the specified user of FrostFS system. Returns the identifiers
|
||||||
|
|
|
@ -41,6 +41,7 @@ type cfg struct {
|
||||||
replicatorTimeout time.Duration
|
replicatorTimeout time.Duration
|
||||||
containerCacheSize int
|
containerCacheSize int
|
||||||
authorizedKeys [][]byte
|
authorizedKeys [][]byte
|
||||||
|
syncBatchSize int
|
||||||
|
|
||||||
localOverrideStorage policyengine.LocalOverrideStorage
|
localOverrideStorage policyengine.LocalOverrideStorage
|
||||||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||||
|
@ -113,6 +114,12 @@ func WithReplicationWorkerCount(n int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithSyncBatchSize(n int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.syncBatchSize = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithContainerCacheSize(n int) Option {
|
func WithContainerCacheSize(n int) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
|
|
|
@ -40,6 +40,7 @@ const (
|
||||||
defaultReplicatorCapacity = 64
|
defaultReplicatorCapacity = 64
|
||||||
defaultReplicatorWorkerCount = 64
|
defaultReplicatorWorkerCount = 64
|
||||||
defaultReplicatorSendTimeout = time.Second * 5
|
defaultReplicatorSendTimeout = time.Second * 5
|
||||||
|
defaultSyncBatchSize = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Service) localReplicationWorker(ctx context.Context) {
|
func (s *Service) localReplicationWorker(ctx context.Context) {
|
||||||
|
|
|
@ -55,6 +55,7 @@ func New(opts ...Option) *Service {
|
||||||
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
||||||
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
||||||
s.replicatorTimeout = defaultReplicatorSendTimeout
|
s.replicatorTimeout = defaultReplicatorSendTimeout
|
||||||
|
s.syncBatchSize = defaultSyncBatchSize
|
||||||
s.metrics = defaultMetricsRegister{}
|
s.metrics = defaultMetricsRegister{}
|
||||||
|
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
|
|
|
@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
||||||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
operationStream <-chan *pilorama.Move,
|
operationStream <-chan *pilorama.Move,
|
||||||
) uint64 {
|
) uint64 {
|
||||||
errGroup, _ := errgroup.WithContext(ctx)
|
|
||||||
const workersCount = 1024
|
|
||||||
errGroup.SetLimit(workersCount)
|
|
||||||
|
|
||||||
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
|
||||||
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
|
||||||
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
|
||||||
// this height is stored in unappliedOperationHeight.
|
|
||||||
var unappliedOperationHeight uint64 = math.MaxUint64
|
|
||||||
var heightMtx sync.Mutex
|
|
||||||
|
|
||||||
var prev *pilorama.Move
|
var prev *pilorama.Move
|
||||||
|
var batch []*pilorama.Move
|
||||||
for m := range operationStream {
|
for m := range operationStream {
|
||||||
// skip already applied op
|
// skip already applied op
|
||||||
if prev != nil && prev.Time == m.Time {
|
if prev != nil && prev.Time == m.Time {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
prev = m
|
prev = m
|
||||||
|
batch = append(batch, m)
|
||||||
|
|
||||||
errGroup.Go(func() error {
|
if len(batch) == s.syncBatchSize {
|
||||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||||
heightMtx.Lock()
|
return batch[0].Time
|
||||||
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
|
|
||||||
heightMtx.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
return nil
|
batch = batch[:0]
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
_ = errGroup.Wait()
|
if len(batch) > 0 {
|
||||||
return unappliedOperationHeight
|
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||||
|
return batch[0].Time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return math.MaxUint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
|
@ -384,7 +376,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
return
|
return
|
||||||
case <-s.syncChan:
|
case <-s.syncChan:
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
|
||||||
s.log.Debug(logs.TreeSyncingTrees)
|
s.log.Info(logs.TreeSyncingTrees)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
@ -402,7 +394,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
||||||
|
|
||||||
s.removeContainers(ctx, newMap)
|
s.removeContainers(ctx, newMap)
|
||||||
|
|
||||||
s.log.Debug(logs.TreeTreesHaveBeenSynchronized)
|
s.log.Info(logs.TreeTreesHaveBeenSynchronized)
|
||||||
|
|
||||||
s.metrics.AddSyncDuration(time.Since(start), true)
|
s.metrics.AddSyncDuration(time.Since(start), true)
|
||||||
span.End()
|
span.End()
|
||||||
|
|
Loading…
Reference in a new issue