forked from TrueCloudLab/frostfs-node
Compare commits
22 commits
ff1e7b531f
...
6e283295a2
Author | SHA1 | Date | |
---|---|---|---|
6e283295a2 | |||
c2f4390f4d | |||
c75f845f4b | |||
cdb3cea4e7 | |||
22b26ea14e | |||
ea190dd425 | |||
08182bd6f1 | |||
2cf30e28b8 | |||
f1556e3c42 | |||
e122ff6013 | |||
e2658c7519 | |||
c00f4bab18 | |||
46fef276b4 | |||
9bd05e94c8 | |||
16830033f8 | |||
1cf51a8079 | |||
3324c26fd8 | |||
a692298533 | |||
be2753de00 | |||
b543569c3f | |||
80f8a8fd3a | |||
2f3bc6eb84 |
39 changed files with 390 additions and 240 deletions
3
Makefile
3
Makefile
|
@ -282,7 +282,6 @@ env-up: all
|
|||
|
||||
# Shutdown dev environment
|
||||
env-down:
|
||||
docker compose -f dev/docker-compose.yml down
|
||||
docker volume rm -f frostfs-node_neo-go
|
||||
docker compose -f dev/docker-compose.yml down -v
|
||||
rm -rf ./$(TMP_DIR)/state
|
||||
rm -rf ./$(TMP_DIR)/storage
|
||||
|
|
|
@ -98,7 +98,7 @@ See `frostfs-contract`'s README.md for build instructions.
|
|||
4. To create container and put object into it run (container and object IDs will be different):
|
||||
|
||||
```
|
||||
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --basic-acl public-read-write --await
|
||||
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --await
|
||||
Enter password > <- press ENTER, the is no password for wallet
|
||||
CID: CfPhEuHQ2PRvM4gfBQDC4dWZY3NccovyfcnEdiq2ixju
|
||||
|
||||
|
|
|
@ -72,4 +72,3 @@ All other `object` sub-commands support only static sessions (2).
|
|||
List of commands supporting sessions (static only):
|
||||
- `create`
|
||||
- `delete`
|
||||
- `set-eacl`
|
||||
|
|
|
@ -15,14 +15,12 @@ import (
|
|||
containerApi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
containerACL string
|
||||
containerPolicy string
|
||||
containerAttributes []string
|
||||
containerAwait bool
|
||||
|
@ -89,9 +87,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
|
|||
err = parseAttributes(&cnr, containerAttributes)
|
||||
commonCmd.ExitOnErr(cmd, "", err)
|
||||
|
||||
var basicACL acl.Basic
|
||||
commonCmd.ExitOnErr(cmd, "decode basic ACL string: %w", basicACL.DecodeString(containerACL))
|
||||
|
||||
tok := getSession(cmd)
|
||||
|
||||
if tok != nil {
|
||||
|
@ -105,7 +100,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
|
|||
}
|
||||
|
||||
cnr.SetPlacementPolicy(*placementPolicy)
|
||||
cnr.SetBasicACL(basicACL)
|
||||
|
||||
var syncContainerPrm internalclient.SyncContainerPrm
|
||||
syncContainerPrm.SetClient(cli)
|
||||
|
@ -163,10 +157,6 @@ func initContainerCreateCmd() {
|
|||
flags.DurationP(commonflags.Timeout, commonflags.TimeoutShorthand, commonflags.TimeoutDefault, commonflags.TimeoutUsage)
|
||||
flags.StringP(commonflags.WalletPath, commonflags.WalletPathShorthand, commonflags.WalletPathDefault, commonflags.WalletPathUsage)
|
||||
flags.StringP(commonflags.Account, commonflags.AccountShorthand, commonflags.AccountDefault, commonflags.AccountUsage)
|
||||
|
||||
flags.StringVar(&containerACL, "basic-acl", acl.NamePrivate, fmt.Sprintf("HEX encoded basic ACL value or keywords like '%s', '%s', '%s'",
|
||||
acl.NamePublicRW, acl.NamePrivate, acl.NamePublicROExtended,
|
||||
))
|
||||
flags.StringVarP(&containerPolicy, "policy", "p", "", "QL-encoded or JSON-encoded placement policy or path to file with it")
|
||||
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
||||
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
||||
|
|
|
@ -1,11 +1,10 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
|
@ -14,6 +13,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/chzyer/readline"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
@ -163,6 +163,16 @@ func (repl *policyPlaygroundREPL) netMap() netmap.NetMap {
|
|||
return nm
|
||||
}
|
||||
|
||||
var policyPlaygroundCompleter = readline.NewPrefixCompleter(
|
||||
readline.PcItem("list"),
|
||||
readline.PcItem("ls"),
|
||||
readline.PcItem("add"),
|
||||
readline.PcItem("load"),
|
||||
readline.PcItem("remove"),
|
||||
readline.PcItem("rm"),
|
||||
readline.PcItem("eval"),
|
||||
)
|
||||
|
||||
func (repl *policyPlaygroundREPL) run() error {
|
||||
if len(viper.GetString(commonflags.RPC)) > 0 {
|
||||
key := key.GetOrGenerate(repl.cmd)
|
||||
|
@ -189,22 +199,38 @@ func (repl *policyPlaygroundREPL) run() error {
|
|||
"rm": repl.handleRemove,
|
||||
"eval": repl.handleEval,
|
||||
}
|
||||
for reader := bufio.NewReader(os.Stdin); ; {
|
||||
fmt.Print("> ")
|
||||
line, err := reader.ReadString('\n')
|
||||
|
||||
rl, err := readline.NewEx(&readline.Config{
|
||||
Prompt: "> ",
|
||||
InterruptPrompt: "^C",
|
||||
AutoComplete: policyPlaygroundCompleter,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error initializing readline: %w", err)
|
||||
}
|
||||
defer rl.Close()
|
||||
|
||||
var exit bool
|
||||
for {
|
||||
line, err := rl.Readline()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
if errors.Is(err, readline.ErrInterrupt) {
|
||||
if exit {
|
||||
return nil
|
||||
}
|
||||
exit = true
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("reading line: %v", err)
|
||||
return fmt.Errorf("reading line: %w", err)
|
||||
}
|
||||
exit = false
|
||||
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) == 0 {
|
||||
continue
|
||||
}
|
||||
cmd := parts[0]
|
||||
handler, exists := cmdHandlers[cmd]
|
||||
if exists {
|
||||
if handler, exists := cmdHandlers[cmd]; exists {
|
||||
if err := handler(parts[1:]); err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
accountingTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/accounting/grpc"
|
||||
|
@ -30,5 +31,27 @@ func initAccountingService(ctx context.Context, c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
accountingGRPC.RegisterAccountingServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(accountingGRPC.AccountingService_ServiceDesc), server)
|
||||
})
|
||||
}
|
||||
|
||||
// frostFSServiceDesc creates a service descriptor with the new namespace for dual service support.
|
||||
func frostFSServiceDesc(sd grpc.ServiceDesc) *grpc.ServiceDesc {
|
||||
sdLegacy := new(grpc.ServiceDesc)
|
||||
*sdLegacy = sd
|
||||
|
||||
const (
|
||||
legacyNamespace = "neo.fs.v2"
|
||||
apemanagerLegacyNamespace = "frostfs.v2"
|
||||
newNamespace = "frost.fs"
|
||||
)
|
||||
|
||||
if strings.HasPrefix(sd.ServiceName, legacyNamespace) {
|
||||
sdLegacy.ServiceName = strings.ReplaceAll(sd.ServiceName, legacyNamespace, newNamespace)
|
||||
} else if strings.HasPrefix(sd.ServiceName, apemanagerLegacyNamespace) {
|
||||
sdLegacy.ServiceName = strings.ReplaceAll(sd.ServiceName, apemanagerLegacyNamespace, newNamespace)
|
||||
}
|
||||
return sdLegacy
|
||||
}
|
||||
|
|
|
@ -26,5 +26,8 @@ func initAPEManagerService(c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
apemanager_grpc.RegisterAPEManagerServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(apemanager_grpc.APEManagerService_ServiceDesc), server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -196,31 +196,6 @@ func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error
|
|||
return s.delInfoCache.get(cnr)
|
||||
}
|
||||
|
||||
type ttlEACLStorage struct {
|
||||
*ttlNetCache[cid.ID, *container.EACL]
|
||||
}
|
||||
|
||||
func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStorage {
|
||||
const eaclCacheSize = 100
|
||||
|
||||
lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) {
|
||||
return v.GetEACL(id)
|
||||
}, metrics.NewCacheMetrics("eacl"))
|
||||
|
||||
return ttlEACLStorage{lruCnrCache}
|
||||
}
|
||||
|
||||
// GetEACL returns eACL value from the cache. If value is missing in the cache
|
||||
// or expired, then it returns value from side chain and updates cache.
|
||||
func (s ttlEACLStorage) GetEACL(cnr cid.ID) (*container.EACL, error) {
|
||||
return s.get(cnr)
|
||||
}
|
||||
|
||||
// InvalidateEACL removes cached eACL value.
|
||||
func (s ttlEACLStorage) InvalidateEACL(cnr cid.ID) {
|
||||
s.remove(cnr)
|
||||
}
|
||||
|
||||
type lruNetmapSource struct {
|
||||
netState netmap.State
|
||||
|
||||
|
|
|
@ -642,8 +642,6 @@ type cfgObject struct {
|
|||
|
||||
cnrSource container.Source
|
||||
|
||||
eaclSource container.EACLSource
|
||||
|
||||
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
||||
|
||||
pool cfgObjectRoutines
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
|
||||
|
@ -24,6 +25,7 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
|
|||
Service: "frostfs-node",
|
||||
InstanceID: getInstanceIDOrDefault(c),
|
||||
Version: misc.Version,
|
||||
Attributes: make(map[string]string),
|
||||
}
|
||||
|
||||
if trustedCa := config.StringSafe(c.Sub(subsection), "trusted_ca"); trustedCa != "" {
|
||||
|
@ -38,11 +40,30 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
|
|||
}
|
||||
conf.ServerCaCertPool = certPool
|
||||
}
|
||||
|
||||
i := uint64(0)
|
||||
for ; ; i++ {
|
||||
si := strconv.FormatUint(i, 10)
|
||||
ac := c.Sub(subsection).Sub("attributes").Sub(si)
|
||||
k := config.StringSafe(ac, "key")
|
||||
if k == "" {
|
||||
break
|
||||
}
|
||||
v := config.StringSafe(ac, "value")
|
||||
if v == "" {
|
||||
return nil, fmt.Errorf("empty tracing attribute value for key %s", k)
|
||||
}
|
||||
if _, ok := conf.Attributes[k]; ok {
|
||||
return nil, fmt.Errorf("tracing attribute key %s defined more than once", k)
|
||||
}
|
||||
conf.Attributes[k] = v
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func getInstanceIDOrDefault(c *config.Config) string {
|
||||
s := config.StringSlice(c.Sub("node"), "addresses")
|
||||
s := config.StringSliceSafe(c.Sub("node"), "addresses")
|
||||
if len(s) > 0 {
|
||||
return s[0]
|
||||
}
|
||||
|
|
46
cmd/frostfs-node/config/tracing/config_test.go
Normal file
46
cmd/frostfs-node/config/tracing/config_test.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package tracing
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestTracingSection(t *testing.T) {
|
||||
t.Run("defaults", func(t *testing.T) {
|
||||
tc, err := ToTracingConfig(configtest.EmptyConfig())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, tc.Enabled)
|
||||
require.Equal(t, tracing.Exporter(""), tc.Exporter)
|
||||
require.Equal(t, "", tc.Endpoint)
|
||||
require.Equal(t, "frostfs-node", tc.Service)
|
||||
require.Equal(t, "", tc.InstanceID)
|
||||
require.Nil(t, tc.ServerCaCertPool)
|
||||
require.Empty(t, tc.Attributes)
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
||||
fileConfigTest := func(c *config.Config) {
|
||||
tc, err := ToTracingConfig(c)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, tc.Enabled)
|
||||
require.Equal(t, tracing.OTLPgRPCExporter, tc.Exporter)
|
||||
require.Equal(t, "localhost", tc.Endpoint)
|
||||
require.Equal(t, "frostfs-node", tc.Service)
|
||||
require.Nil(t, tc.ServerCaCertPool)
|
||||
require.EqualValues(t, map[string]string{
|
||||
"key0": "value",
|
||||
"key1": "value",
|
||||
}, tc.Attributes)
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
||||
t.Run("ENV", func(t *testing.T) {
|
||||
configtest.ForEnvFileType(t, path, fileConfigTest)
|
||||
})
|
||||
}
|
|
@ -10,6 +10,8 @@ import (
|
|||
|
||||
const (
|
||||
subsection = "tree"
|
||||
|
||||
SyncBatchSizeDefault = 1000
|
||||
)
|
||||
|
||||
// TreeConfig is a wrapper over "tree" config section
|
||||
|
@ -74,6 +76,17 @@ func (c TreeConfig) SyncInterval() time.Duration {
|
|||
return config.DurationSafe(c.cfg, "sync_interval")
|
||||
}
|
||||
|
||||
// SyncBatchSize returns the value of "sync_batch_size"
|
||||
// config parameter from the "tree" section.
|
||||
//
|
||||
// Returns `SyncBatchSizeDefault` if config value is not specified.
|
||||
func (c TreeConfig) SyncBatchSize() int {
|
||||
if v := config.IntSafe(c.cfg, "sync_batch_size"); v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return SyncBatchSizeDefault
|
||||
}
|
||||
|
||||
// AuthorizedKeys parses and returns an array of "authorized_keys" config
|
||||
// parameter from "tree" section.
|
||||
//
|
||||
|
|
|
@ -44,6 +44,7 @@ func TestTreeSection(t *testing.T) {
|
|||
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
|
||||
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
|
||||
require.Equal(t, time.Hour, treeSec.SyncInterval())
|
||||
require.Equal(t, 2000, treeSec.SyncBatchSize())
|
||||
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
|
||||
}
|
||||
|
||||
|
|
|
@ -64,16 +64,15 @@ func initContainerService(_ context.Context, c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
containerGRPC.RegisterContainerServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(containerGRPC.ContainerService_ServiceDesc), server)
|
||||
})
|
||||
|
||||
c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr)
|
||||
}
|
||||
|
||||
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
|
||||
eACLFetcher := &morphEACLFetcher{
|
||||
w: client,
|
||||
}
|
||||
|
||||
cnrRdr := new(morphContainerReader)
|
||||
|
||||
cnrWrt := &morphContainerWriter{
|
||||
|
@ -81,8 +80,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
}
|
||||
|
||||
if c.cfgMorph.cacheTTL <= 0 {
|
||||
c.cfgObject.eaclSource = eACLFetcher
|
||||
cnrRdr.eacl = eACLFetcher
|
||||
c.cfgObject.cnrSource = cnrSrc
|
||||
cnrRdr.src = cnrSrc
|
||||
cnrRdr.lister = client
|
||||
|
@ -126,11 +123,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
|||
c.cfgObject.cnrSource = containerCache
|
||||
}
|
||||
|
||||
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
|
||||
c.cfgObject.eaclSource = cachedEACLStorage
|
||||
|
||||
cnrRdr.lister = client
|
||||
cnrRdr.eacl = c.cfgObject.eaclSource
|
||||
cnrRdr.src = c.cfgObject.cnrSource
|
||||
}
|
||||
|
||||
|
@ -221,8 +214,6 @@ func (c *cfg) ExternalAddresses() []string {
|
|||
|
||||
// implements interface required by container service provided by morph executor.
|
||||
type morphContainerReader struct {
|
||||
eacl containerCore.EACLSource
|
||||
|
||||
src containerCore.Source
|
||||
|
||||
lister interface {
|
||||
|
@ -238,10 +229,6 @@ func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo,
|
|||
return x.src.DeletionInfo(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
|
||||
return x.eacl.GetEACL(id)
|
||||
}
|
||||
|
||||
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
|
||||
return x.lister.ContainersOf(id)
|
||||
}
|
||||
|
|
|
@ -166,6 +166,9 @@ func initNetmapService(ctx context.Context, c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
netmapGRPC.RegisterNetmapServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(netmapGRPC.NetmapService_ServiceDesc), server)
|
||||
})
|
||||
|
||||
addNewEpochNotificationHandlers(c)
|
||||
|
|
|
@ -2,7 +2,6 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
|
@ -14,7 +13,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
||||
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
|
||||
|
@ -37,7 +35,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -218,6 +215,9 @@ func initObjectService(c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
objectGRPC.RegisterObjectServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(objectGRPC.ObjectService_ServiceDesc), server)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -481,29 +481,6 @@ func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *object
|
|||
)
|
||||
}
|
||||
|
||||
type morphEACLFetcher struct {
|
||||
w *cntClient.Client
|
||||
}
|
||||
|
||||
func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
|
||||
eaclInfo, err := s.w.GetEACL(cnr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
binTable, err := eaclInfo.Value.Marshal()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal eACL table: %w", err)
|
||||
}
|
||||
|
||||
if !eaclInfo.Signature.Verify(binTable) {
|
||||
// TODO(@cthulhu-rider): #468 use "const" error
|
||||
return nil, errors.New("invalid signature of the eACL table")
|
||||
}
|
||||
|
||||
return eaclInfo, nil
|
||||
}
|
||||
|
||||
type engineWithoutNotifications struct {
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
|
|
@ -61,5 +61,8 @@ func initSessionService(c *cfg) {
|
|||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
sessionGRPC.RegisterSessionServiceServer(s, server)
|
||||
|
||||
// TODO(@aarifullin): #1487 remove the dual service support.
|
||||
s.RegisterService(frostFSServiceDesc(sessionGRPC.SessionService_ServiceDesc), server)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ func initTreeService(c *cfg) {
|
|||
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
|
||||
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
|
||||
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
|
||||
tree.WithSyncBatchSize(treeConfig.SyncBatchSize()),
|
||||
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
|
||||
tree.WithMetrics(c.metricsCollector.TreeService()),
|
||||
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),
|
||||
|
|
|
@ -31,6 +31,7 @@ FROSTFS_TREE_REPLICATION_CHANNEL_CAPACITY=32
|
|||
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
|
||||
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
|
||||
FROSTFS_TREE_SYNC_INTERVAL=1h
|
||||
FROSTFS_TREE_SYNC_BATCH_SIZE=2000
|
||||
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||
|
||||
# gRPC section
|
||||
|
@ -202,6 +203,10 @@ FROSTFS_TRACING_ENABLED=true
|
|||
FROSTFS_TRACING_ENDPOINT="localhost"
|
||||
FROSTFS_TRACING_EXPORTER="otlp_grpc"
|
||||
FROSTFS_TRACING_TRUSTED_CA=""
|
||||
FROSTFS_TRACING_ATTRIBUTES_0_KEY=key0
|
||||
FROSTFS_TRACING_ATTRIBUTES_0_VALUE=value
|
||||
FROSTFS_TRACING_ATTRIBUTES_1_KEY=key1
|
||||
FROSTFS_TRACING_ATTRIBUTES_1_VALUE=value
|
||||
|
||||
FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@
|
|||
"replication_worker_count": 32,
|
||||
"replication_timeout": "5s",
|
||||
"sync_interval": "1h",
|
||||
"sync_batch_size": 2000,
|
||||
"authorized_keys": [
|
||||
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
|
||||
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
|
||||
|
@ -258,9 +259,19 @@
|
|||
},
|
||||
"tracing": {
|
||||
"enabled": true,
|
||||
"endpoint": "localhost:9090",
|
||||
"endpoint": "localhost",
|
||||
"exporter": "otlp_grpc",
|
||||
"trusted_ca": "/etc/ssl/tracing.pem"
|
||||
"trusted_ca": "",
|
||||
"attributes":[
|
||||
{
|
||||
"key": "key0",
|
||||
"value": "value"
|
||||
},
|
||||
{
|
||||
"key": "key1",
|
||||
"value": "value"
|
||||
}
|
||||
]
|
||||
},
|
||||
"runtime": {
|
||||
"soft_memory_limit": 1073741824
|
||||
|
|
|
@ -59,6 +59,7 @@ tree:
|
|||
replication_channel_capacity: 32
|
||||
replication_timeout: 5s
|
||||
sync_interval: 1h
|
||||
sync_batch_size: 2000
|
||||
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
|
||||
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
|
||||
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
|
||||
|
@ -238,6 +239,11 @@ tracing:
|
|||
exporter: "otlp_grpc"
|
||||
endpoint: "localhost"
|
||||
trusted_ca: ""
|
||||
attributes:
|
||||
- key: key0
|
||||
value: value
|
||||
- key: key1
|
||||
value: value
|
||||
|
||||
runtime:
|
||||
soft_memory_limit: 1gb
|
||||
|
|
|
@ -78,7 +78,12 @@
|
|||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
|
||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||
"FROSTFS_TRACING_ENABLED":"true",
|
||||
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8080"
|
||||
},
|
||||
"postDebugTask": "env-down"
|
||||
},
|
||||
|
@ -129,7 +134,12 @@
|
|||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
|
||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||
"FROSTFS_TRACING_ENABLED":"true",
|
||||
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8082"
|
||||
},
|
||||
"postDebugTask": "env-down"
|
||||
},
|
||||
|
@ -180,7 +190,12 @@
|
|||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
|
||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||
"FROSTFS_TRACING_ENABLED":"true",
|
||||
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8084"
|
||||
},
|
||||
"postDebugTask": "env-down"
|
||||
},
|
||||
|
@ -231,7 +246,12 @@
|
|||
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
|
||||
"FROSTFS_PROMETHEUS_ENABLED":"true",
|
||||
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
|
||||
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
|
||||
"FROSTFS_TRACING_ENABLED":"true",
|
||||
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
|
||||
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
|
||||
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8086"
|
||||
},
|
||||
"postDebugTask": "env-down"
|
||||
}
|
||||
|
|
|
@ -14,3 +14,15 @@ services:
|
|||
- ./neo-go/node-wallet.json:/wallets/node-wallet.json
|
||||
- ./neo-go/config.yml:/wallets/config.yml
|
||||
- ./neo-go/wallet.json:/wallets/wallet.json
|
||||
jaeger:
|
||||
image: jaegertracing/all-in-one:latest
|
||||
container_name: jaeger
|
||||
ports:
|
||||
- '4317:4317' #OTLP over gRPC
|
||||
- '4318:4318' #OTLP over HTTP
|
||||
- '16686:16686' #frontend
|
||||
stop_signal: SIGKILL
|
||||
environment:
|
||||
- COLLECTOR_OTLP_ENABLED=true
|
||||
- SPAN_STORAGE_TYPE=badger
|
||||
- BADGER_EPHEMERAL=true
|
||||
|
|
|
@ -412,12 +412,12 @@ object:
|
|||
- $attribute:ClusterName
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
|
||||
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
|
||||
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
|
||||
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
|
||||
|
||||
# `runtime` section
|
||||
Contains runtime parameters.
|
||||
|
|
2
go.mod
2
go.mod
|
@ -7,7 +7,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -58,16 +58,3 @@ type EACL struct {
|
|||
// Session within which Value was set. Nil means session absence.
|
||||
Session *session.Container
|
||||
}
|
||||
|
||||
// EACLSource is the interface that wraps
|
||||
// basic methods of extended ACL table source.
|
||||
type EACLSource interface {
|
||||
// GetEACL reads the table from the source by identifier.
|
||||
// It returns any error encountered.
|
||||
//
|
||||
// GetEACL must return exactly one non-nil value.
|
||||
//
|
||||
// Must return apistatus.ErrEACLNotFound if requested
|
||||
// eACL table is not in source.
|
||||
GetEACL(cid.ID) (*EACL, error)
|
||||
}
|
||||
|
|
|
@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
|
|||
return nil
|
||||
}
|
||||
|
||||
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
|
||||
if err != nil {
|
||||
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||
e.reportShardError(ctx, lst[index], "can't perform `TreeApplyBatch`", err,
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.String("tree", treeID),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
||||
|
|
|
@ -558,6 +558,80 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
|
|||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
m, err := t.filterSeen(cnr, treeID, m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(m) == 0 {
|
||||
success = true
|
||||
return nil
|
||||
}
|
||||
|
||||
ch := make(chan error)
|
||||
b := &batch{
|
||||
forest: t,
|
||||
cid: cnr,
|
||||
treeID: treeID,
|
||||
results: []chan<- error{ch},
|
||||
operations: m,
|
||||
}
|
||||
go func() {
|
||||
b.run()
|
||||
}()
|
||||
err = <-ch
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
|
||||
t.modeMtx.RLock()
|
||||
defer t.modeMtx.RUnlock()
|
||||
|
||||
if t.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
ops := make([]*Move, 0, len(m))
|
||||
err := t.db.View(func(tx *bbolt.Tx) error {
|
||||
treeRoot := tx.Bucket(bucketName(cnr, treeID))
|
||||
if treeRoot == nil {
|
||||
ops = m
|
||||
return nil
|
||||
}
|
||||
b := treeRoot.Bucket(logBucket)
|
||||
for _, op := range m {
|
||||
var logKey [8]byte
|
||||
binary.BigEndian.PutUint64(logKey[:], op.Time)
|
||||
seen := b.Get(logKey[:]) != nil
|
||||
if !seen {
|
||||
ops = append(ops, op)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, metaerr.Wrap(err)
|
||||
}
|
||||
return ops, nil
|
||||
}
|
||||
|
||||
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
||||
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
||||
var (
|
||||
|
|
|
@ -111,6 +111,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
|
|||
return s.Apply(op)
|
||||
}
|
||||
|
||||
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
|
||||
for _, op := range ops {
|
||||
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Init(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ type Forest interface {
|
|||
// TreeApply applies replicated operation from another node.
|
||||
// If background is true, TreeApply will first check whether an operation exists.
|
||||
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||
// TreeApplyBatch applies replicated operations from another node.
|
||||
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
|
||||
// TreeGetByPath returns all nodes corresponding to the path.
|
||||
// The path is constructed by descending from the root using the values of the
|
||||
// AttributeFilename in meta.
|
||||
|
|
|
@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
|
|||
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
||||
}
|
||||
|
||||
// TreeApplyBatch implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
if s.pilorama == nil {
|
||||
return ErrPiloramaDisabled
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
||||
|
|
|
@ -27,7 +27,6 @@ const (
|
|||
getMethod = "get"
|
||||
listMethod = "list"
|
||||
containersOfMethod = "containersOf"
|
||||
eaclMethod = "eACL"
|
||||
deletionInfoMethod = "deletionInfo"
|
||||
|
||||
// putNamedMethod is method name for container put with an alias. It is exported to provide custom fee.
|
||||
|
|
|
@ -1,95 +0,0 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
)
|
||||
|
||||
// GetEACL reads the extended ACL table from FrostFS system
|
||||
// through Container contract call.
|
||||
//
|
||||
// Returns apistatus.EACLNotFound if eACL table is missing in the contract.
|
||||
func (c *Client) GetEACL(cnr cid.ID) (*container.EACL, error) {
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cnr.Encode(binCnr)
|
||||
|
||||
prm := client.TestInvokePrm{}
|
||||
prm.SetMethod(eaclMethod)
|
||||
prm.SetArgs(binCnr)
|
||||
|
||||
prms, err := c.client.TestInvoke(prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not perform test invocation (%s): %w", eaclMethod, err)
|
||||
} else if ln := len(prms); ln != 1 {
|
||||
return nil, fmt.Errorf("unexpected stack item count (%s): %d", eaclMethod, ln)
|
||||
}
|
||||
|
||||
arr, err := client.ArrayFromStackItem(prms[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get item array of eACL (%s): %w", eaclMethod, err)
|
||||
}
|
||||
|
||||
if len(arr) != 4 {
|
||||
return nil, fmt.Errorf("unexpected eacl stack item count (%s): %d", eaclMethod, len(arr))
|
||||
}
|
||||
|
||||
rawEACL, err := client.BytesFromStackItem(arr[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of eACL (%s): %w", eaclMethod, err)
|
||||
}
|
||||
|
||||
sig, err := client.BytesFromStackItem(arr[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of eACL signature (%s): %w", eaclMethod, err)
|
||||
}
|
||||
|
||||
// Client may not return errors if the table is missing, so check this case additionally.
|
||||
// The absence of a signature in the response can be taken as an eACL absence criterion,
|
||||
// since unsigned table cannot be approved in the storage by design.
|
||||
if len(sig) == 0 {
|
||||
return nil, new(apistatus.EACLNotFound)
|
||||
}
|
||||
|
||||
pub, err := client.BytesFromStackItem(arr[2])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of eACL public key (%s): %w", eaclMethod, err)
|
||||
}
|
||||
|
||||
binToken, err := client.BytesFromStackItem(arr[3])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get byte array of eACL session token (%s): %w", eaclMethod, err)
|
||||
}
|
||||
|
||||
var res container.EACL
|
||||
|
||||
res.Value = eacl.NewTable()
|
||||
if err = res.Value.Unmarshal(rawEACL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(binToken) > 0 {
|
||||
res.Session = new(session.Container)
|
||||
|
||||
err = res.Session.Unmarshal(binToken)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal session token: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(@cthulhu-rider): #468 implement and use another approach to avoid conversion
|
||||
var sigV2 refs.Signature
|
||||
sigV2.SetKey(pub)
|
||||
sigV2.SetSign(sig)
|
||||
sigV2.SetScheme(refs.ECDSA_RFC6979_SHA256)
|
||||
|
||||
err = res.Signature.ReadFromV2(sigV2)
|
||||
return &res, err
|
||||
}
|
|
@ -25,7 +25,6 @@ type morphExecutor struct {
|
|||
// Reader is an interface of read-only container storage.
|
||||
type Reader interface {
|
||||
containercore.Source
|
||||
containercore.EACLSource
|
||||
|
||||
// ContainersOf returns a list of container identifiers belonging
|
||||
// to the specified user of FrostFS system. Returns the identifiers
|
||||
|
|
|
@ -41,6 +41,7 @@ type cfg struct {
|
|||
replicatorTimeout time.Duration
|
||||
containerCacheSize int
|
||||
authorizedKeys [][]byte
|
||||
syncBatchSize int
|
||||
|
||||
localOverrideStorage policyengine.LocalOverrideStorage
|
||||
morphChainStorage policyengine.MorphRuleChainStorageReader
|
||||
|
@ -113,6 +114,12 @@ func WithReplicationWorkerCount(n int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithSyncBatchSize(n int) Option {
|
||||
return func(c *cfg) {
|
||||
c.syncBatchSize = n
|
||||
}
|
||||
}
|
||||
|
||||
func WithContainerCacheSize(n int) Option {
|
||||
return func(c *cfg) {
|
||||
if n > 0 {
|
||||
|
|
|
@ -40,6 +40,7 @@ const (
|
|||
defaultReplicatorCapacity = 64
|
||||
defaultReplicatorWorkerCount = 64
|
||||
defaultReplicatorSendTimeout = time.Second * 5
|
||||
defaultSyncBatchSize = 1000
|
||||
)
|
||||
|
||||
func (s *Service) localReplicationWorker(ctx context.Context) {
|
||||
|
|
|
@ -55,6 +55,7 @@ func New(opts ...Option) *Service {
|
|||
s.replicatorChannelCapacity = defaultReplicatorCapacity
|
||||
s.replicatorWorkerCount = defaultReplicatorWorkerCount
|
||||
s.replicatorTimeout = defaultReplicatorSendTimeout
|
||||
s.syncBatchSize = defaultSyncBatchSize
|
||||
s.metrics = defaultMetricsRegister{}
|
||||
|
||||
for i := range opts {
|
||||
|
|
|
@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
|||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
operationStream <-chan *pilorama.Move,
|
||||
) uint64 {
|
||||
errGroup, _ := errgroup.WithContext(ctx)
|
||||
const workersCount = 1024
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
||||
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
||||
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
||||
// this height is stored in unappliedOperationHeight.
|
||||
var unappliedOperationHeight uint64 = math.MaxUint64
|
||||
var heightMtx sync.Mutex
|
||||
|
||||
var prev *pilorama.Move
|
||||
var batch []*pilorama.Move
|
||||
for m := range operationStream {
|
||||
// skip already applied op
|
||||
if prev != nil && prev.Time == m.Time {
|
||||
continue
|
||||
}
|
||||
prev = m
|
||||
batch = append(batch, m)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
||||
heightMtx.Lock()
|
||||
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
|
||||
heightMtx.Unlock()
|
||||
return err
|
||||
if len(batch) == s.syncBatchSize {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
}
|
||||
return nil
|
||||
})
|
||||
batch = batch[:0]
|
||||
}
|
||||
}
|
||||
_ = errGroup.Wait()
|
||||
return unappliedOperationHeight
|
||||
if len(batch) > 0 {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
}
|
||||
}
|
||||
return math.MaxUint64
|
||||
}
|
||||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
|
@ -384,7 +376,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
return
|
||||
case <-s.syncChan:
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
|
||||
s.log.Debug(ctx, logs.TreeSyncingTrees)
|
||||
s.log.Info(ctx, logs.TreeSyncingTrees)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
|
@ -402,7 +394,7 @@ func (s *Service) syncLoop(ctx context.Context) {
|
|||
|
||||
s.removeContainers(ctx, newMap)
|
||||
|
||||
s.log.Debug(ctx, logs.TreeTreesHaveBeenSynchronized)
|
||||
s.log.Info(ctx, logs.TreeTreesHaveBeenSynchronized)
|
||||
|
||||
s.metrics.AddSyncDuration(time.Since(start), true)
|
||||
span.End()
|
||||
|
|
Loading…
Reference in a new issue