Compare commits

...

11 commits

Author SHA1 Message Date
f1556e3c42
[#1488] Makefile: Drop all containers created on env-up
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 17:24:44 +03:00
e122ff6013
[#1488] dev: Add Jaeger image and enable tracing on debug
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 17:24:44 +03:00
e2658c7519
[#1488] tracing: KV attributes for spans from config
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 17:24:43 +03:00
c00f4bab18
[#1488] go.mod: Bump observability version
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 17:24:43 +03:00
46fef276b4 [#1449] tree: Log tree sync with Info level
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 12:11:07 +00:00
9bd05e94c8 [#1449] tree: Add ApplyBatch method
Concurrent Apply can lead to child node applies before parent, so
undo/redo operations will perform. This leads to performance degradation
in case of tree with many sublevels.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-12 12:11:07 +00:00
16830033f8 [#1483] cli: Remove --basic-acl flag
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-12 12:10:51 +00:00
1cf51a8079 [#1483] cli/docs: Remove set-eacl mention
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-12 12:10:51 +00:00
3324c26fd8 [#1483] morph: Remove container.GetEACL()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-12 12:10:51 +00:00
a692298533 [#1483] node: Remove eACL cache
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-12 12:10:51 +00:00
be2753de00 [#1490] docs: Update description for object.get.priority
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-11-12 13:35:13 +03:00
34 changed files with 316 additions and 230 deletions

View file

@ -282,7 +282,6 @@ env-up: all
# Shutdown dev environment
env-down:
docker compose -f dev/docker-compose.yml down
docker volume rm -f frostfs-node_neo-go
docker compose -f dev/docker-compose.yml down -v
rm -rf ./$(TMP_DIR)/state
rm -rf ./$(TMP_DIR)/storage

View file

@ -98,7 +98,7 @@ See `frostfs-contract`'s README.md for build instructions.
4. To create container and put object into it run (container and object IDs will be different):
```
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --basic-acl public-read-write --await
./bin/frostfs-cli container create -r 127.0.0.1:8080 --wallet ./dev/wallet.json --policy "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" --await
Enter password > <- press ENTER, the is no password for wallet
CID: CfPhEuHQ2PRvM4gfBQDC4dWZY3NccovyfcnEdiq2ixju

View file

@ -72,4 +72,3 @@ All other `object` sub-commands support only static sessions (2).
List of commands supporting sessions (static only):
- `create`
- `delete`
- `set-eacl`

View file

@ -15,14 +15,12 @@ import (
containerApi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/spf13/cobra"
)
var (
containerACL string
containerPolicy string
containerAttributes []string
containerAwait bool
@ -89,9 +87,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
err = parseAttributes(&cnr, containerAttributes)
commonCmd.ExitOnErr(cmd, "", err)
var basicACL acl.Basic
commonCmd.ExitOnErr(cmd, "decode basic ACL string: %w", basicACL.DecodeString(containerACL))
tok := getSession(cmd)
if tok != nil {
@ -105,7 +100,6 @@ It will be stored in sidechain when inner ring will accepts it.`,
}
cnr.SetPlacementPolicy(*placementPolicy)
cnr.SetBasicACL(basicACL)
var syncContainerPrm internalclient.SyncContainerPrm
syncContainerPrm.SetClient(cli)
@ -163,10 +157,6 @@ func initContainerCreateCmd() {
flags.DurationP(commonflags.Timeout, commonflags.TimeoutShorthand, commonflags.TimeoutDefault, commonflags.TimeoutUsage)
flags.StringP(commonflags.WalletPath, commonflags.WalletPathShorthand, commonflags.WalletPathDefault, commonflags.WalletPathUsage)
flags.StringP(commonflags.Account, commonflags.AccountShorthand, commonflags.AccountDefault, commonflags.AccountUsage)
flags.StringVar(&containerACL, "basic-acl", acl.NamePrivate, fmt.Sprintf("HEX encoded basic ACL value or keywords like '%s', '%s', '%s'",
acl.NamePublicRW, acl.NamePrivate, acl.NamePublicROExtended,
))
flags.StringVarP(&containerPolicy, "policy", "p", "", "QL-encoded or JSON-encoded placement policy or path to file with it")
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")

View file

@ -196,31 +196,6 @@ func (s ttlContainerStorage) DeletionInfo(cnr cid.ID) (*container.DelInfo, error
return s.delInfoCache.get(cnr)
}
type ttlEACLStorage struct {
*ttlNetCache[cid.ID, *container.EACL]
}
func newCachedEACLStorage(v container.EACLSource, ttl time.Duration) ttlEACLStorage {
const eaclCacheSize = 100
lruCnrCache := newNetworkTTLCache(eaclCacheSize, ttl, func(id cid.ID) (*container.EACL, error) {
return v.GetEACL(id)
}, metrics.NewCacheMetrics("eacl"))
return ttlEACLStorage{lruCnrCache}
}
// GetEACL returns eACL value from the cache. If value is missing in the cache
// or expired, then it returns value from side chain and updates cache.
func (s ttlEACLStorage) GetEACL(cnr cid.ID) (*container.EACL, error) {
return s.get(cnr)
}
// InvalidateEACL removes cached eACL value.
func (s ttlEACLStorage) InvalidateEACL(cnr cid.ID) {
s.remove(cnr)
}
type lruNetmapSource struct {
netState netmap.State

View file

@ -642,8 +642,6 @@ type cfgObject struct {
cnrSource container.Source
eaclSource container.EACLSource
cfgAccessPolicyEngine cfgAccessPolicyEngine
pool cfgObjectRoutines

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/misc"
@ -24,6 +25,7 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
Service: "frostfs-node",
InstanceID: getInstanceIDOrDefault(c),
Version: misc.Version,
Attributes: make(map[string]string),
}
if trustedCa := config.StringSafe(c.Sub(subsection), "trusted_ca"); trustedCa != "" {
@ -38,11 +40,30 @@ func ToTracingConfig(c *config.Config) (*tracing.Config, error) {
}
conf.ServerCaCertPool = certPool
}
i := uint64(0)
for ; ; i++ {
si := strconv.FormatUint(i, 10)
ac := c.Sub(subsection).Sub("attributes").Sub(si)
k := config.StringSafe(ac, "key")
if k == "" {
break
}
v := config.StringSafe(ac, "value")
if v == "" {
return nil, fmt.Errorf("empty tracing attribute value for key %s", k)
}
if _, ok := conf.Attributes[k]; ok {
return nil, fmt.Errorf("tracing attribute key %s defined more than once", k)
}
conf.Attributes[k] = v
}
return conf, nil
}
func getInstanceIDOrDefault(c *config.Config) string {
s := config.StringSlice(c.Sub("node"), "addresses")
s := config.StringSliceSafe(c.Sub("node"), "addresses")
if len(s) > 0 {
return s[0]
}

View file

@ -0,0 +1,46 @@
package tracing
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
configtest "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/test"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"github.com/stretchr/testify/require"
)
func TestTracingSection(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
tc, err := ToTracingConfig(configtest.EmptyConfig())
require.NoError(t, err)
require.Equal(t, false, tc.Enabled)
require.Equal(t, tracing.Exporter(""), tc.Exporter)
require.Equal(t, "", tc.Endpoint)
require.Equal(t, "frostfs-node", tc.Service)
require.Equal(t, "", tc.InstanceID)
require.Nil(t, tc.ServerCaCertPool)
require.Empty(t, tc.Attributes)
})
const path = "../../../../config/example/node"
fileConfigTest := func(c *config.Config) {
tc, err := ToTracingConfig(c)
require.NoError(t, err)
require.Equal(t, true, tc.Enabled)
require.Equal(t, tracing.OTLPgRPCExporter, tc.Exporter)
require.Equal(t, "localhost", tc.Endpoint)
require.Equal(t, "frostfs-node", tc.Service)
require.Nil(t, tc.ServerCaCertPool)
require.EqualValues(t, map[string]string{
"key0": "value",
"key1": "value",
}, tc.Attributes)
}
configtest.ForEachFileType(path, fileConfigTest)
t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(t, path, fileConfigTest)
})
}

View file

@ -10,6 +10,8 @@ import (
const (
subsection = "tree"
SyncBatchSizeDefault = 1000
)
// TreeConfig is a wrapper over "tree" config section
@ -74,6 +76,17 @@ func (c TreeConfig) SyncInterval() time.Duration {
return config.DurationSafe(c.cfg, "sync_interval")
}
// SyncBatchSize returns the value of "sync_batch_size"
// config parameter from the "tree" section.
//
// Returns `SyncBatchSizeDefault` if config value is not specified.
func (c TreeConfig) SyncBatchSize() int {
if v := config.IntSafe(c.cfg, "sync_batch_size"); v > 0 {
return int(v)
}
return SyncBatchSizeDefault
}
// AuthorizedKeys parses and returns an array of "authorized_keys" config
// parameter from "tree" section.
//

View file

@ -44,6 +44,7 @@ func TestTreeSection(t *testing.T) {
require.Equal(t, 32, treeSec.ReplicationWorkerCount())
require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout())
require.Equal(t, time.Hour, treeSec.SyncInterval())
require.Equal(t, 2000, treeSec.SyncBatchSize())
require.Equal(t, expectedKeys, treeSec.AuthorizedKeys())
}

View file

@ -73,10 +73,6 @@ func initContainerService(_ context.Context, c *cfg) {
}
func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) {
eACLFetcher := &morphEACLFetcher{
w: client,
}
cnrRdr := new(morphContainerReader)
cnrWrt := &morphContainerWriter{
@ -84,8 +80,6 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
}
if c.cfgMorph.cacheTTL <= 0 {
c.cfgObject.eaclSource = eACLFetcher
cnrRdr.eacl = eACLFetcher
c.cfgObject.cnrSource = cnrSrc
cnrRdr.src = cnrSrc
cnrRdr.lister = client
@ -129,11 +123,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
c.cfgObject.cnrSource = containerCache
}
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
c.cfgObject.eaclSource = cachedEACLStorage
cnrRdr.lister = client
cnrRdr.eacl = c.cfgObject.eaclSource
cnrRdr.src = c.cfgObject.cnrSource
}
@ -224,8 +214,6 @@ func (c *cfg) ExternalAddresses() []string {
// implements interface required by container service provided by morph executor.
type morphContainerReader struct {
eacl containerCore.EACLSource
src containerCore.Source
lister interface {
@ -241,10 +229,6 @@ func (x *morphContainerReader) DeletionInfo(id cid.ID) (*containerCore.DelInfo,
return x.src.DeletionInfo(id)
}
func (x *morphContainerReader) GetEACL(id cid.ID) (*containerCore.EACL, error) {
return x.eacl.GetEACL(id)
}
func (x *morphContainerReader) ContainersOf(id *user.ID) ([]cid.ID, error) {
return x.lister.ContainersOf(id)
}

View file

@ -2,7 +2,6 @@ package main
import (
"context"
"errors"
"fmt"
"net"
@ -14,7 +13,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
morphClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
objectTransportGRPC "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/object/grpc"
@ -37,7 +35,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -484,29 +481,6 @@ func createAPEService(c *cfg, splitSvc *objectService.TransportSplitter) *object
)
}
type morphEACLFetcher struct {
w *cntClient.Client
}
func (s *morphEACLFetcher) GetEACL(cnr cid.ID) (*containercore.EACL, error) {
eaclInfo, err := s.w.GetEACL(cnr)
if err != nil {
return nil, err
}
binTable, err := eaclInfo.Value.Marshal()
if err != nil {
return nil, fmt.Errorf("marshal eACL table: %w", err)
}
if !eaclInfo.Signature.Verify(binTable) {
// TODO(@cthulhu-rider): #468 use "const" error
return nil, errors.New("invalid signature of the eACL table")
}
return eaclInfo, nil
}
type engineWithoutNotifications struct {
engine *engine.StorageEngine
}

View file

@ -62,6 +62,7 @@ func initTreeService(c *cfg) {
tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()),
tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()),
tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()),
tree.WithSyncBatchSize(treeConfig.SyncBatchSize()),
tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()),
tree.WithMetrics(c.metricsCollector.TreeService()),
tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()),

View file

@ -31,6 +31,7 @@ FROSTFS_TREE_REPLICATION_CHANNEL_CAPACITY=32
FROSTFS_TREE_REPLICATION_WORKER_COUNT=32
FROSTFS_TREE_REPLICATION_TIMEOUT=5s
FROSTFS_TREE_SYNC_INTERVAL=1h
FROSTFS_TREE_SYNC_BATCH_SIZE=2000
FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
# gRPC section
@ -202,6 +203,10 @@ FROSTFS_TRACING_ENABLED=true
FROSTFS_TRACING_ENDPOINT="localhost"
FROSTFS_TRACING_EXPORTER="otlp_grpc"
FROSTFS_TRACING_TRUSTED_CA=""
FROSTFS_TRACING_ATTRIBUTES_0_KEY=key0
FROSTFS_TRACING_ATTRIBUTES_0_VALUE=value
FROSTFS_TRACING_ATTRIBUTES_1_KEY=key1
FROSTFS_TRACING_ATTRIBUTES_1_VALUE=value
FROSTFS_RUNTIME_SOFT_MEMORY_LIMIT=1073741824

View file

@ -69,6 +69,7 @@
"replication_worker_count": 32,
"replication_timeout": "5s",
"sync_interval": "1h",
"sync_batch_size": 2000,
"authorized_keys": [
"0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0",
"02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56"
@ -258,9 +259,19 @@
},
"tracing": {
"enabled": true,
"endpoint": "localhost:9090",
"endpoint": "localhost",
"exporter": "otlp_grpc",
"trusted_ca": "/etc/ssl/tracing.pem"
"trusted_ca": "",
"attributes":[
{
"key": "key0",
"value": "value"
},
{
"key": "key1",
"value": "value"
}
]
},
"runtime": {
"soft_memory_limit": 1073741824

View file

@ -59,6 +59,7 @@ tree:
replication_channel_capacity: 32
replication_timeout: 5s
sync_interval: 1h
sync_batch_size: 2000
authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli
- 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0
- 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56
@ -238,6 +239,11 @@ tracing:
exporter: "otlp_grpc"
endpoint: "localhost"
trusted_ca: ""
attributes:
- key: key0
value: value
- key: key1
value: value
runtime:
soft_memory_limit: 1gb

View file

@ -78,7 +78,12 @@
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s1/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9090",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
"FROSTFS_TRACING_ENABLED":"true",
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8080"
},
"postDebugTask": "env-down"
},
@ -129,7 +134,12 @@
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s2/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9091",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
"FROSTFS_TRACING_ENABLED":"true",
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8082"
},
"postDebugTask": "env-down"
},
@ -180,7 +190,12 @@
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s3/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9092",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
"FROSTFS_TRACING_ENABLED":"true",
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8084"
},
"postDebugTask": "env-down"
},
@ -231,7 +246,12 @@
"FROSTFS_STORAGE_SHARD_1_PILORAMA_PATH":"${workspaceFolder}/.cache/storage/s4/pilorama1",
"FROSTFS_PROMETHEUS_ENABLED":"true",
"FROSTFS_PROMETHEUS_ADDRESS":"127.0.0.1:9093",
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s"
"FROSTFS_PROMETHEUS_SHUTDOWN_TIMEOUT":"15s",
"FROSTFS_TRACING_ENABLED":"true",
"FROSTFS_TRACING_EXPORTER":"otlp_grpc",
"FROSTFS_TRACING_ENDPOINT":"127.0.0.1:4317",
"FROSTFS_TRACING_ATTRIBUTES_0_KEY":"host.ip",
"FROSTFS_TRACING_ATTRIBUTES_0_VALUE":"127.0.0.1:8086"
},
"postDebugTask": "env-down"
}

View file

@ -14,3 +14,15 @@ services:
- ./neo-go/node-wallet.json:/wallets/node-wallet.json
- ./neo-go/config.yml:/wallets/config.yml
- ./neo-go/wallet.json:/wallets/wallet.json
jaeger:
image: jaegertracing/all-in-one:latest
container_name: jaeger
ports:
- '4317:4317' #OTLP over gRPC
- '4318:4318' #OTLP over HTTP
- '16686:16686' #frontend
stop_signal: SIGKILL
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=badger
- BADGER_EPHEMERAL=true

View file

@ -412,12 +412,12 @@ object:
- $attribute:ClusterName
```
| Parameter | Type | Default value | Description |
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET and SEARCH requests. |
| Parameter | Type | Default value | Description |
|-----------------------------|------------|---------------|------------------------------------------------------------------------------------------------|
| `delete.tombstone_lifetime` | `int` | `5` | Tombstone lifetime for removed objects in epochs. |
| `put.remote_pool_size` | `int` | `10` | Max pool size for performing remote `PUT` operations. Used by Policer and Replicator services. |
| `put.local_pool_size` | `int` | `10` | Max pool size for performing local `PUT` operations. Used by Policer and Replicator services. |
| `get.priority` | `[]string` | | List of metrics of nodes for prioritization. Used for computing response on GET requests. |
# `runtime` section
Contains runtime parameters.

2
go.mod
View file

@ -7,7 +7,7 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-contract v0.20.0
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20240909114314-666d326cc573
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241107121119-cb813e27a823
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972

BIN
go.sum

Binary file not shown.

View file

@ -58,16 +58,3 @@ type EACL struct {
// Session within which Value was set. Nil means session absence.
Session *session.Container
}
// EACLSource is the interface that wraps
// basic methods of extended ACL table source.
type EACLSource interface {
// GetEACL reads the table from the source by identifier.
// It returns any error encountered.
//
// GetEACL must return exactly one non-nil value.
//
// Must return apistatus.ErrEACLNotFound if requested
// eACL table is not in source.
GetEACL(cid.ID) (*EACL, error)
}

View file

@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
return nil
}
// TreeApplyBatch implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err,
zap.Stringer("cid", cnr),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
return nil
}
// TreeGetByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",

View file

@ -558,6 +558,80 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
return metaerr.Wrap(err)
}
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
m, err := t.filterSeen(cnr, treeID, m)
if err != nil {
return err
}
if len(m) == 0 {
success = true
return nil
}
ch := make(chan error)
b := &batch{
forest: t,
cid: cnr,
treeID: treeID,
results: []chan<- error{ch},
operations: m,
}
go func() {
b.run()
}()
err = <-ch
success = err == nil
return metaerr.Wrap(err)
}
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
ops := make([]*Move, 0, len(m))
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil {
ops = m
return nil
}
b := treeRoot.Bucket(logBucket)
for _, op := range m {
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], op.Time)
seen := b.Get(logKey[:]) != nil
if !seen {
ops = append(ops, op)
}
}
return nil
})
if err != nil {
return nil, metaerr.Wrap(err)
}
return ops, nil
}
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var (

View file

@ -111,6 +111,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
return s.Apply(op)
}
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
for _, op := range ops {
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
return err
}
}
return nil
}
func (f *memoryForest) Init() error {
return nil
}

View file

@ -21,6 +21,8 @@ type Forest interface {
// TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists.
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
// TreeApplyBatch applies replicated operations from another node.
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
// TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the
// AttributeFilename in meta.

View file

@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
}
// TreeApplyBatch implements the pilorama.Forest interface.
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
}
// TreeGetByPath implements the pilorama.Forest interface.
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",

View file

@ -27,7 +27,6 @@ const (
getMethod = "get"
listMethod = "list"
containersOfMethod = "containersOf"
eaclMethod = "eACL"
deletionInfoMethod = "deletionInfo"
// putNamedMethod is method name for container put with an alias. It is exported to provide custom fee.

View file

@ -1,95 +0,0 @@
package container
import (
"crypto/sha256"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
// GetEACL reads the extended ACL table from FrostFS system
// through Container contract call.
//
// Returns apistatus.EACLNotFound if eACL table is missing in the contract.
func (c *Client) GetEACL(cnr cid.ID) (*container.EACL, error) {
binCnr := make([]byte, sha256.Size)
cnr.Encode(binCnr)
prm := client.TestInvokePrm{}
prm.SetMethod(eaclMethod)
prm.SetArgs(binCnr)
prms, err := c.client.TestInvoke(prm)
if err != nil {
return nil, fmt.Errorf("could not perform test invocation (%s): %w", eaclMethod, err)
} else if ln := len(prms); ln != 1 {
return nil, fmt.Errorf("unexpected stack item count (%s): %d", eaclMethod, ln)
}
arr, err := client.ArrayFromStackItem(prms[0])
if err != nil {
return nil, fmt.Errorf("could not get item array of eACL (%s): %w", eaclMethod, err)
}
if len(arr) != 4 {
return nil, fmt.Errorf("unexpected eacl stack item count (%s): %d", eaclMethod, len(arr))
}
rawEACL, err := client.BytesFromStackItem(arr[0])
if err != nil {
return nil, fmt.Errorf("could not get byte array of eACL (%s): %w", eaclMethod, err)
}
sig, err := client.BytesFromStackItem(arr[1])
if err != nil {
return nil, fmt.Errorf("could not get byte array of eACL signature (%s): %w", eaclMethod, err)
}
// Client may not return errors if the table is missing, so check this case additionally.
// The absence of a signature in the response can be taken as an eACL absence criterion,
// since unsigned table cannot be approved in the storage by design.
if len(sig) == 0 {
return nil, new(apistatus.EACLNotFound)
}
pub, err := client.BytesFromStackItem(arr[2])
if err != nil {
return nil, fmt.Errorf("could not get byte array of eACL public key (%s): %w", eaclMethod, err)
}
binToken, err := client.BytesFromStackItem(arr[3])
if err != nil {
return nil, fmt.Errorf("could not get byte array of eACL session token (%s): %w", eaclMethod, err)
}
var res container.EACL
res.Value = eacl.NewTable()
if err = res.Value.Unmarshal(rawEACL); err != nil {
return nil, err
}
if len(binToken) > 0 {
res.Session = new(session.Container)
err = res.Session.Unmarshal(binToken)
if err != nil {
return nil, fmt.Errorf("could not unmarshal session token: %w", err)
}
}
// TODO(@cthulhu-rider): #468 implement and use another approach to avoid conversion
var sigV2 refs.Signature
sigV2.SetKey(pub)
sigV2.SetSign(sig)
sigV2.SetScheme(refs.ECDSA_RFC6979_SHA256)
err = res.Signature.ReadFromV2(sigV2)
return &res, err
}

View file

@ -25,7 +25,6 @@ type morphExecutor struct {
// Reader is an interface of read-only container storage.
type Reader interface {
containercore.Source
containercore.EACLSource
// ContainersOf returns a list of container identifiers belonging
// to the specified user of FrostFS system. Returns the identifiers

View file

@ -41,6 +41,7 @@ type cfg struct {
replicatorTimeout time.Duration
containerCacheSize int
authorizedKeys [][]byte
syncBatchSize int
localOverrideStorage policyengine.LocalOverrideStorage
morphChainStorage policyengine.MorphRuleChainStorageReader
@ -113,6 +114,12 @@ func WithReplicationWorkerCount(n int) Option {
}
}
func WithSyncBatchSize(n int) Option {
return func(c *cfg) {
c.syncBatchSize = n
}
}
func WithContainerCacheSize(n int) Option {
return func(c *cfg) {
if n > 0 {

View file

@ -40,6 +40,7 @@ const (
defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5
defaultSyncBatchSize = 1000
)
func (s *Service) localReplicationWorker(ctx context.Context) {

View file

@ -55,6 +55,7 @@ func New(opts ...Option) *Service {
s.replicatorChannelCapacity = defaultReplicatorCapacity
s.replicatorWorkerCount = defaultReplicatorWorkerCount
s.replicatorTimeout = defaultReplicatorSendTimeout
s.syncBatchSize = defaultSyncBatchSize
s.metrics = defaultMetricsRegister{}
for i := range opts {

View file

@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move,
) uint64 {
errGroup, _ := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
// We run TreeApply concurrently for the operation batch. Let's consider two operations
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
// on m1. That means the service must start sync from m1.Time in the next iteration and
// this height is stored in unappliedOperationHeight.
var unappliedOperationHeight uint64 = math.MaxUint64
var heightMtx sync.Mutex
var prev *pilorama.Move
var batch []*pilorama.Move
for m := range operationStream {
// skip already applied op
if prev != nil && prev.Time == m.Time {
continue
}
prev = m
batch = append(batch, m)
errGroup.Go(func() error {
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
heightMtx.Lock()
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
heightMtx.Unlock()
return err
if len(batch) == s.syncBatchSize {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time
}
return nil
})
batch = batch[:0]
}
}
_ = errGroup.Wait()
return unappliedOperationHeight
if len(batch) > 0 {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time
}
}
return math.MaxUint64
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
@ -384,7 +376,7 @@ func (s *Service) syncLoop(ctx context.Context) {
return
case <-s.syncChan:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
s.log.Debug(logs.TreeSyncingTrees)
s.log.Info(logs.TreeSyncingTrees)
start := time.Now()
@ -402,7 +394,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.removeContainers(ctx, newMap)
s.log.Debug(logs.TreeTreesHaveBeenSynchronized)
s.log.Info(logs.TreeTreesHaveBeenSynchronized)
s.metrics.AddSyncDuration(time.Since(start), true)
span.End()