[#3] Add job fetcher
Some checks failed
/ Vulncheck (pull_request) Failing after 1m39s
/ DCO (pull_request) Successful in 1m42s
/ Builds (1.21) (pull_request) Successful in 2m2s
/ Builds (1.22) (pull_request) Successful in 2m5s
/ Lint (pull_request) Failing after 32s
/ Tests (1.21) (pull_request) Successful in 1m57s
/ Tests (1.22) (pull_request) Successful in 1m53s

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-11 15:54:52 +03:00
parent f2893421a1
commit 6529c43c0e
19 changed files with 2268 additions and 232 deletions

View file

@ -5,17 +5,22 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/credential/walletsource"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph/contract"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/notificator"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/viper"
"go.uber.org/zap"
@ -25,11 +30,18 @@ type (
App struct {
log *zap.Logger
logLevel zap.AtomicLevel
key *keys.PrivateKey
cfg *viper.Viper
done chan struct{}
appServices []*metrics.Service
appMetrics *metrics.AppMetrics
notificator *notificator.Notificator
settings *appSettings
}
appSettings struct {
mu sync.RWMutex
serviceKeys []*keys.PublicKey
}
)
@ -47,6 +59,7 @@ func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App {
cfg: cfg,
done: make(chan struct{}),
appMetrics: metrics.NewAppMetrics(),
settings: newAppSettings(cfg, log),
}
a.appMetrics.SetHealth(HealthStatusStarting)
@ -56,76 +69,138 @@ func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App {
}
func (a *App) init(ctx context.Context) {
key, err := fetchKey(a.cfg)
var err error
a.key, err = fetchKey(a.cfg)
if err != nil {
a.log.Fatal(logs.FailedToLoadPrivateKey, zap.Error(err))
}
endpoints := fetchMorphEndpoints(a.cfg, a.log)
newListenerFunc := a.getNewListenerFunction(ctx, key, endpoints)
handler := a.getNewEpochHandler()
reconnectInterval := fetchMorphReconnectClientsInterval(a.cfg)
clientCfg := morph.Config{
Logger: a.log,
Endpoints: endpoints,
Key: a.key,
ReconnectInterval: reconnectInterval,
DialTimeout: fetchMorphDialTimeout(a.cfg),
}
cli, err := morph.New(ctx, clientCfg)
if err != nil {
a.log.Fatal(logs.FailedToInitMorphClient, zap.Error(err))
}
lnCfg := notificator.ConfigListener{
Client: cli,
Logger: a.log,
ReconnectInterval: reconnectInterval,
}
ln, err := notificator.NewListener(ctx, lnCfg)
if err != nil {
a.log.Fatal(logs.FailedToInitListener, zap.Error(err))
}
credSource, err := walletsource.New(fetchWalletsCredentials(a.cfg, a.log))
if err != nil {
a.log.Fatal(logs.CouldntCreateWalletSource, zap.Error(err))
}
frostfsidContract, err := resolver.ResolveContractHash(a.cfg.GetString(cfgMorphContractFrostfsID), endpoints[0].Address)
if err != nil {
a.log.Fatal(logs.ResolveFrostfsIDContract, zap.Error(err))
}
ffsidCfg := contract.FrostFSIDConfig{
Client: cli,
ContractHash: frostfsidContract,
}
containerContract, err := resolver.ResolveContractHash(a.cfg.GetString(cfgMorphContractContainer), endpoints[0].Address)
if err != nil {
a.log.Fatal(logs.ResolveContainerContract, zap.Error(err))
}
containerCfg := contract.ContainerConfig{
Client: cli,
ContractHash: containerContract,
Log: a.log,
}
objPool, treePool := getPools(ctx, a.cfg, a.log, a.key)
lifecycleCfg := lifecycle.Config{
UserFetcher: contract.NewFrostFSID(ffsidCfg),
ContainerFetcher: contract.NewContainer(containerCfg),
ConfigurationFetcher: frostfs.NewFrostFS(objPool, a.log),
CredentialSource: credSource,
Settings: a.settings,
CurrentLifecycler: a.key,
Logger: a.log,
TreeFetcher: tree.NewTree(frostfs.NewTreePoolWrapper(treePool), a.log),
BufferSize: fetchJobFetcherBuffer(a.cfg),
}
jobProvider := lifecycle.NewJobProvider(lifecycleCfg)
handler := func(ctx context.Context, ee notificator.NewEpochEvent) {
// todo (d.kirillov) use real job executor here TrueCloudLab/frostfs-s3-lifecycler#4
fmt.Println("start handler", ee.Epoch, time.Now())
jobs, err := jobProvider.GetJobs(ctx, ee.Epoch)
if err != nil {
a.log.Error(logs.FailedToGetJobs, zap.Error(err))
} else {
for job := range jobs {
fmt.Println(job)
}
}
fmt.Println("end handler", ee.Epoch, time.Now())
}
netmapContract, err := resolver.ResolveContractHash(a.cfg.GetString(cfgMorphContractNetmap), endpoints[0].Address)
if err != nil {
a.log.Fatal(logs.ResolveNetmapContract, zap.Error(err))
}
cfg := notificator.Config{
Handler: handler,
Logger: a.log,
NewListener: newListenerFunc,
NetmapContract: netmapContract,
ReconnectClientsInterval: 30 * time.Second,
notificatorCfg := notificator.Config{
Handler: handler,
Logger: a.log,
Listener: ln,
NetmapContract: netmapContract,
}
if a.notificator, err = notificator.New(ctx, cfg); err != nil {
if a.notificator, err = notificator.New(ctx, notificatorCfg); err != nil {
a.log.Fatal(logs.InitNotificator, zap.Error(err))
}
}
func (a *App) getNewListenerFunction(ctx context.Context, key *keys.PrivateKey, endpoints []client.Endpoint) notificator.ListenerCreationFunc {
morphLogger := &logger.Logger{Logger: a.log}
clientOptions := []client.Option{
client.WithLogger(morphLogger),
client.WithEndpoints(endpoints...),
func newAppSettings(v *viper.Viper, log *Logger) *appSettings {
s := &appSettings{}
s.update(v, log.logger)
return s
}
func (s *appSettings) update(cfg *viper.Viper, log *zap.Logger) {
svcKeys, svcKeyErr := fetchLifecycleServices(cfg)
if svcKeyErr != nil {
log.Warn(logs.FailedToFetchServicesKeys, zap.Error(svcKeyErr))
}
return func(connectionLostCb func()) (event.Listener, error) {
options := append([]client.Option{client.WithConnLostCallback(connectionLostCb)}, clientOptions...)
cli, err := client.New(ctx, key, options...)
if err != nil {
return nil, fmt.Errorf("create new client: %w", err)
}
s.mu.Lock()
defer s.mu.Unlock()
currentBlock, err := cli.BlockCount()
if err != nil {
return nil, fmt.Errorf("get block count: %w", err)
}
subs, err := subscriber.New(ctx, &subscriber.Params{
Log: morphLogger,
StartFromBlock: currentBlock,
Client: cli,
})
if err != nil {
return nil, fmt.Errorf("create subscriber: %w", err)
}
return event.NewListener(event.ListenerParams{
Logger: morphLogger,
Subscriber: subs,
WorkerPoolCapacity: 0, // 0 means "infinite"
})
if svcKeyErr == nil {
s.serviceKeys = svcKeys
}
}
func (a *App) getNewEpochHandler() notificator.NewEpochHandler {
return func(_ context.Context, ee notificator.NewEpochEvent) {
// todo (d.kirillov) use real job executor here TrueCloudLab/frostfs-s3-lifecycler#3
fmt.Println("start handler", ee.Epoch)
time.Sleep(30 * time.Second)
fmt.Println("end handler", ee.Epoch)
}
func (s *appSettings) ServicesKeys() keys.PublicKeys {
s.mu.RLock()
defer s.mu.RUnlock()
return s.serviceKeys
}
func (a *App) Wait() {
@ -187,6 +262,8 @@ func (a *App) configReload() {
a.stopAppServices()
a.startAppServices()
a.settings.update(a.cfg, a.log)
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
@ -212,3 +289,58 @@ func (a *App) stopAppServices() {
svc.ShutDown(ctx)
}
}
func getPools(ctx context.Context, cfg *viper.Viper, logger *zap.Logger, key *keys.PrivateKey) (*pool.Pool, *treepool.Pool) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&key.PrivateKey)
prmTree.SetKey(key)
for _, peer := range fetchPeers(cfg, logger) {
prm.AddNode(peer)
prmTree.AddNode(peer)
}
connTimeout := fetchConnectTimeout(cfg)
prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := fetchStreamTimeout(cfg)
prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := fetchRebalanceInterval(cfg)
prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := fetchErrorThreshold(cfg)
prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(logger)
prmTree.SetLogger(logger)
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgFrostFSTreePoolMaxAttempts))
p, err := pool.NewPool(prm)
if err != nil {
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
}
if err = p.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
}
if err = treePool.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
}
return p, treePool
}

View file

@ -9,8 +9,10 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/credential/walletsource"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"github.com/nspcc-dev/neo-go/cli/flags"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
neogoflags "github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -43,8 +45,33 @@ const (
cfgMorphRPCEndpointTrustedCAListTmpl = cfgMorphRPCEndpointPrefixTmpl + "trusted_ca_list"
cfgMorphRPCEndpointCertificateTmpl = cfgMorphRPCEndpointPrefixTmpl + "certificate"
cfgMorphRPCEndpointKeyTmpl = cfgMorphRPCEndpointPrefixTmpl + "key"
cfgMorphReconnectClientsInterval = "morph.reconnect_clients_interval"
cfgMorphDialTimeout = "morph.dial_timeout"
cfgMorphContractNetmap = "morph.contract.netmap"
cfgMorphReconnectClientInterval = "morph.reconnect_clients_interval"
cfgMorphContractFrostfsID = "morph.contract.frostfsid"
cfgMorphContractContainer = "morph.contract.container"
// Credential source.
cfgCredentialSourceWalletsPrefixTmpl = "credential_source.wallets.%d."
cfgCredentialSourceWalletsPathTmpl = cfgCredentialSourceWalletsPrefixTmpl + "path"
cfgCredentialSourceWalletsAddressTmpl = cfgCredentialSourceWalletsPrefixTmpl + "address"
cfgCredentialSourceWalletsPassphraseTmpl = cfgCredentialSourceWalletsPrefixTmpl + "passphrase"
// FrostFS.
cfgFrostFSConnectTimeout = "frostfs.connect_timeout"
cfgFrostFSStreamTimeout = "frostfs.stream_timeout"
cfgFrostFSHealthcheckTimeout = "frostfs.healthcheck_timeout"
cfgFrostFSRebalanceInterval = "frostfs.rebalance_interval"
cfgFrostFSPoolErrorThreshold = "frostfs.pool_error_threshold"
cfgFrostFSTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
cfgFrostFSPeersPrefixTmpl = "frostfs.peers.%d."
cfgFrostFSPeersAddressTmpl = cfgFrostFSPeersPrefixTmpl + "address"
cfgFrostFSPeersPriorityTmpl = cfgFrostFSPeersPrefixTmpl + "priority"
cfgFrostFSPeersWeightTmpl = cfgFrostFSPeersPrefixTmpl + "weight"
// Lifecycle.
cfgLifecycleJobFetcherBuffer = "lifecycle.job_fetcher_buffer"
cfgLifecycleServices = "lifecycle.services"
// Command line args.
cmdHelp = "help"
@ -57,7 +84,17 @@ const (
defaultShutdownTimeout = 15 * time.Second
componentName = "frostfs-s3-lifecycler"
defaultMorphRPCEndpointPriority = 1
defaultMorphRPCEndpointPriority = 1
defaultMorphReconnectClientsInterval = 30 * time.Second
defaultMorphDialTimeout = 5 * time.Second
defaultFrostFSRebalanceInterval = 60 * time.Second
defaultFrostFSHealthcheckTimeout = 15 * time.Second
defaultFrostFSConnectTimeout = 10 * time.Second
defaultFrostFSStreamTimeout = 10 * time.Second
defaultFrostFSPoolErrorThreshold uint32 = 100
defaultLifecycleJobFetcherBuffer = 1000
)
func settings() *viper.Viper {
@ -90,8 +127,20 @@ func settings() *viper.Viper {
v.SetDefault(cfgPprofEnabled, false)
// morph:
v.SetDefault(cfgMorphReconnectClientsInterval, defaultMorphReconnectClientsInterval)
v.SetDefault(cfgMorphDialTimeout, defaultMorphDialTimeout)
v.SetDefault(cfgMorphContractNetmap, "netmap.frostfs")
v.SetDefault(cfgMorphReconnectClientInterval, 30*time.Second)
v.SetDefault(cfgMorphContractFrostfsID, "frostfsid.frostfs")
v.SetDefault(cfgMorphContractContainer, "container.frostfs")
// frostfs:
v.SetDefault(cfgFrostFSConnectTimeout, defaultFrostFSConnectTimeout)
v.SetDefault(cfgFrostFSRebalanceInterval, defaultFrostFSRebalanceInterval)
v.SetDefault(cfgFrostFSHealthcheckTimeout, defaultFrostFSHealthcheckTimeout)
v.SetDefault(cfgFrostFSStreamTimeout, defaultFrostFSStreamTimeout)
// lifecycle:
v.SetDefault(cfgLifecycleJobFetcherBuffer, defaultLifecycleJobFetcherBuffer)
// Bind flags with configuration values.
if err := v.BindPFlags(flags); err != nil {
@ -212,7 +261,7 @@ func fetchKey(v *viper.Viper) (*keys.PrivateKey, error) {
if len(walletAddress) == 0 {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(walletAddress)
addr, err = neogoflags.ParseAddress(walletAddress)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
@ -273,3 +322,146 @@ func fetchMorphEndpoints(v *viper.Viper, l *zap.Logger) []client.Endpoint {
}
return res
}
func fetchWalletsCredentials(v *viper.Viper, l *zap.Logger) []walletsource.Wallet {
var res []walletsource.Wallet
for i := 0; ; i++ {
walletPath := v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPathTmpl, i))
if walletPath == "" {
break
}
res = append(res, walletsource.Wallet{
Path: walletPath,
Address: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsAddressTmpl, i)),
Passphrase: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPassphraseTmpl, i)),
})
}
if len(res) == 0 {
l.Fatal(logs.NoCredentialSourceWallets)
}
return res
}
func fetchPeers(v *viper.Viper, l *zap.Logger) []pool.NodeParam {
var nodes []pool.NodeParam
for i := 0; ; i++ {
address := v.GetString(fmt.Sprintf(cfgFrostFSPeersAddressTmpl, i))
if address == "" {
break
}
priority := v.GetInt(fmt.Sprintf(cfgFrostFSPeersPriorityTmpl, i))
if priority <= 0 { // unspecified or wrong
priority = 1
}
weight := v.GetFloat64(fmt.Sprintf(cfgFrostFSPeersWeightTmpl, i))
if weight <= 0 { // unspecified or wrong
weight = 1
}
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
l.Info(logs.AddedStoragePeer,
zap.String("address", address),
zap.Int("priority", priority),
zap.Float64("weight", weight))
}
return nodes
}
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
connTimeout := cfg.GetDuration(cfgFrostFSConnectTimeout)
if connTimeout <= 0 {
connTimeout = defaultFrostFSConnectTimeout
}
return connTimeout
}
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
streamTimeout := cfg.GetDuration(cfgFrostFSStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultFrostFSStreamTimeout
}
return streamTimeout
}
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
healthCheckTimeout := cfg.GetDuration(cfgFrostFSHealthcheckTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultFrostFSHealthcheckTimeout
}
return healthCheckTimeout
}
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
rebalanceInterval := cfg.GetDuration(cfgFrostFSRebalanceInterval)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultFrostFSRebalanceInterval
}
return rebalanceInterval
}
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
errorThreshold := cfg.GetUint32(cfgFrostFSPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultFrostFSPoolErrorThreshold
}
return errorThreshold
}
func fetchJobFetcherBuffer(cfg *viper.Viper) int {
bufferSize := cfg.GetInt(cfgLifecycleJobFetcherBuffer)
if bufferSize <= 0 {
bufferSize = defaultLifecycleJobFetcherBuffer
}
return bufferSize
}
func fetchMorphReconnectClientsInterval(cfg *viper.Viper) time.Duration {
val := cfg.GetDuration(cfgMorphReconnectClientsInterval)
if val <= 0 {
val = defaultMorphReconnectClientsInterval
}
return val
}
func fetchMorphDialTimeout(cfg *viper.Viper) time.Duration {
val := cfg.GetDuration(cfgMorphDialTimeout)
if val <= 0 {
val = defaultMorphDialTimeout
}
return val
}
func fetchLifecycleServices(v *viper.Viper) (keys.PublicKeys, error) {
configKeys := v.GetStringSlice(cfgLifecycleServices)
result := make(keys.PublicKeys, 0, len(configKeys))
uniqKeys := make(map[string]struct{}, len(configKeys))
for _, configKey := range configKeys {
if _, ok := uniqKeys[configKey]; ok {
continue
}
k, err := keys.NewPublicKeyFromString(configKey)
if err != nil {
return nil, fmt.Errorf("key '%s': %w", configKey, err)
}
result = append(result, k)
uniqKeys[configKey] = struct{}{}
}
return result, nil
}

View file

@ -26,4 +26,34 @@ S3_LIFECYCLER_MORPH_RPC_ENDPOINT_0_KEY="/path/to/key"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_1_ADDRESS="wss://rpc2.morph.frostfs.info:40341/ws"
S3_LIFECYCLER_MORPH_RPC_ENDPOINT_1_PRIORITY=2
S3_LIFECYCLER_MORPH_RECONNECT_CLIENTS_INTERVAL=30s
S3_LIFECYCLER_MORPH_RECONNECT_DIAL_TIMEOUT=5s
S3_LIFECYCLER_MORPH_CONTRACT_NETMAP=netmap.frostfs
S3_LIFECYCLER_MORPH_CONTRACT_FROSTFSID=frostfsid.frostfs
S3_LIFECYCLER_MORPH_CONTRACT_CONTAINER=container.frostfs
# Credential source
S3_LIFECYCLER_CREDENTIAL_SOURCE_WALLETS_0_PATH=/path/to/user/wallet.json
S3_LIFECYCLER_CREDENTIAL_SOURCE_WALLETS_0_ADDRESS=NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP
S3_LIFECYCLER_CREDENTIAL_SOURCE_WALLETS_0_PASSPHRASE=""
# Lifecycle
S3_LIFECYCLER_LIFECYCLE_JOB_FETCHER_BUFFER=1000
S3_LIFECYCLER_LIFECYCLE_SERVICES=0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf 031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a
# FrostFS
S3_LIFECYCLER_FROSTFS_STREAM_TIMEOUT=10s
S3_LIFECYCLER_FROSTFS_CONNECT_TIMEOUT=10s
S3_LIFECYCLER_FROSTFS_HEALTHCHECK_TIMEOUT=15s
S3_LIFECYCLER_FROSTFS_REBALANCE_INTERVAL=60s
S3_LIFECYCLER_FROSTFS_POOL_ERROR_THRESHOLD=100
S3_LIFECYCLER_FROSTFS_TREE_POOL_MAX_ATTEMPTS=4
S3_LIFECYCLER_FROSTFS_PEERS_0_ADDRESS=node1.frostfs:8080
S3_LIFECYCLER_FROSTFS_PEERS_0_PRIORITY=1
S3_LIFECYCLER_FROSTFS_PEERS_0_WEIGHT=1
S3_LIFECYCLER_FROSTFS_PEERS_1_ADDRESS=node2.frostfs:8080
S3_LIFECYCLER_FROSTFS_PEERS_1_PRIORITY=2
S3_LIFECYCLER_FROSTFS_PEERS_1_WEIGHT=0.1
S3_LIFECYCLER_FROSTFS_PEERS_2_ADDRESS=node3.frostfs:8080
S3_LIFECYCLER_FROSTFS_PEERS_2_PRIORITY=2
S3_LIFECYCLER_FROSTFS_PEERS_2_WEIGHT=0.9

View file

@ -27,5 +27,45 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
reconnect_clients_interval: 30s
dial_timeout: 5s
contract:
netmap: netmap.frostfs
frostfsid: frostfsid.frostfs
container: container.frostfs
credential_source:
wallets:
- path: /path/to/wallet.json
address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP
passphrase: ""
lifecycle:
job_fetcher_buffer: 1000
services:
- 0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf
frostfs:
stream_timeout: 10s
connect_timeout: 10s
healthcheck_timeout: 15s
rebalance_interval: 60s
pool_error_threshold: 100
tree_pool_max_attempts: 4
peers:
0:
priority: 1
weight: 1
address: s01.frostfs.devenv:8080
1:
priority: 2
weight: 1
address: s02.frostfs.devenv:8080
2:
priority: 2
weight: 1
address: s03.frostfs.devenv:8080
3:
priority: 2
weight: 1
address: s04.frostfs.devenv:8080

View file

@ -4,12 +4,16 @@ This section contains detailed FrostFS S3 Lifecycler component configuration des
# Structure
| Section | Description |
|--------------|-------------------------------------------------|
| `logger` | [Logger configuration](#logger-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
| `morph` | [Morph configuration](#morph-section) |
| Section | Description |
|---------------------|--------------------------------------------------------------|
| `wallet` | [Wallet configuration](#wallet-section) |
| `logger` | [Logger configuration](#logger-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
| `morph` | [Morph configuration](#morph-section) |
| `credential_source` | [Credential source configuration](#credentialsource-section) |
| `lifecycle` | [Lifecycle configuration](#lifecycle-section) |
| `frostfs` | [FrostFS configuration](#frostfs-section) |
### Reload on SIGHUP
@ -22,6 +26,23 @@ You can send SIGHUP signal to app using the following command:
$ kill -s SIGHUP <app_pid>
```
# `wallet` section
Configuration of key for lifecycle service.
```yaml
wallet:
path: /path/to/wallet.json
address: Nhfg3TbpwogLvDGVvAvqyThbsHgoSUKwtn
passphrase: ""
```
| Parameter | Type | Default value | Description |
|--------------|----------|---------------|--------------------------------------------------------------------------|
| `path` | `string` | | Path to wallet |
| `address` | `string` | | Account address to get from wallet. If omitted default one will be used. |
| `passphrase` | `string` | | Passphrase to decrypt wallet. |
# `logger` section
```yaml
@ -81,16 +102,108 @@ morph:
- address: wss://rpc2.morph.frostfs.info:40341/ws
priority: 2
reconnect_clients_interval: 30s
dial_timeout: 5s
contract:
netmap: netmap.frostfs
frostfsid: frostfsid.frostfs
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------|------------|---------------|------------------|---------------------------------------------------------------------------------------------------------|
| `rpc_endpoint.address` | `string` | no | | The address of the RPC host to connect. |
| `rpc_endpoint.priority` | `int` | no | | Priority of RPC endpoint. |
| `rpc_endpoint.trusted_ca_list` | `[]string` | no | | List of paths to CAs to use in mTLS configuration. |
| `rpc_endpoint.certificate` | `string` | no | | Path to certificate to use in mTLS configuration. |
| `rpc_endpoint.key` | `string` | no | | Path to key to use in mTLS configuration. |
| `reconnect_clients_interval` | `string` | no | `30s` | When all endpoints are failed. Overall connection be reinitialized. This value is time between retries. |
| `contract.netmap` | `string` | no | `netmap.frostfs` | Netmap contract hash (LE) or name in NNS. |
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------------|------------|---------------|---------------------|------------------------------------------------------------------------------------------------------------------|
| `rpc_endpoint.address` | `string` | no | | The address of the RPC host to connect. |
| `rpc_endpoint.priority` | `int` | no | | Priority of RPC endpoint. |
| `rpc_endpoint.trusted_ca_list` | `[]string` | no | | List of paths to CAs to use in mTLS configuration. |
| `rpc_endpoint.certificate` | `string` | no | | Path to certificate to use in mTLS configuration. |
| `rpc_endpoint.key` | `string` | no | | Path to key to use in mTLS configuration. |
| `reconnect_clients_interval` | `string` | no | `30s` | When all endpoints are failed. Overall connection be reinitialized. This value is time between retries. |
| `reconnect_clients_interval` | `string` | no | `5s` | Dial timeout to connect to morph endpoint. |
| `contract.netmap` | `string` | no | `netmap.frostfs` | Netmap contract hash (LE) or name in NNS. |
| `contract.frostfsid` | `string` | no | `frostfsid.frostfs` | FrostfsID contract hash (LE) or name in NNS. This contract is used to get all users to process their containers. |
| `contract.container` | `string` | no | `container.frostfs` | Container contract hash (LE) or name in NNS. |
# `credential_source` section
Contains configuration for the source of user private keys (credentials).
```yaml
credential_source:
wallets:
- path: /path/to/wallet.json
address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP
passphrase: ""
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------------|----------|---------------|---------------|-----------------------------------------------------------------|
| `wallets` | | | | Source of user private keys as wallets files on filesystem. |
| `wallets.path` | `string` | no | | Path to wallet on filesystem. |
| `wallets.address` | `string` | no | | Account address in wallet. If omitted default one will be used. |
| `wallets.passphrase` | `string` | no | | Passphrase to decrypt wallet. |
# `lifecycle` section
Configuration for main lifecycle handling procedure.
```yaml
lifecycle:
job_fetcher_buffer: 1000
services:
- 0313b1ac3a8076e155a7e797b24f0b650cccad5941ea59d7cfd51a024a8b2a06bf
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `job_fetcher_buffer` | `int` | no | `1000` | Size for buffered channel to fetch users/container and other data for lifecycle procedure. This param helps reduce number concurrent outgoing network requests. |
| `services` | `[]string` | yes | | List of Lifecycle services public keys. Needs to split jobs. |
# `frostfs` section
Configuration for FrostFS storage.
```yaml
frostfs:
stream_timeout: 10s
connect_timeout: 10s
healthcheck_timeout: 5s
rebalance_interval: 1m
pool_error_threshold: 100
tree_pool_max_attempts: 4
peers:
0:
address: node1.frostfs:8080
priority: 1
weight: 1
1:
address: node2.frostfs:8080
priority: 2
weight: 0.1
2:
address: node3.frostfs:8080
priority: 2
weight: 0.9
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|--------------------------|------------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------|
| `stream_timeout` | `duration` | no | `10s` | Timeout for individual operations in streaming RPC. |
| `connect_timeout` | `duration` | no | `10s` | Timeout to connect to a storage node. |
| `healthcheck_timeout` | `duration` | no | `15s` | Timeout to check storage node health during rebalance. |
| `rebalance_interval` | `duration` | no | `60s` | Interval to check storage node health. |
| `pool_error_threshold` | `uint32` | no | `100` | The number of errors on connection after which storage node is considered as unhealthy. |
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
## `peers` section
This configuration makes TO-IAM use the first node (node1.frostfs:8080)
while it's healthy. Otherwise, TO-IAM uses the second node (node2.frostfs:8080)
for 10% of requests and the third node (node3.frostfs:8080) for 90% of requests.
Until nodes with the same priority level are healthy
nodes with other priority are not used.
The lower the value, the higher the priority.
| Parameter | Type | Default value | Description |
|------------------|----------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
| `peers.address` | `string` | | Address of storage node. |
| `peers.priority` | `int` | `1` | It allows to group nodes and don't switch group until all nodes with the same priority will be unhealthy. The lower the value, the higher the priority. |
| `peers.weight` | `float` | `1` | Weight of node in the group with the same priority. Distribute requests to nodes proportionally to these values. |

37
go.mod
View file

@ -3,8 +3,12 @@ module git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler
go 1.21
require (
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-node v0.42.0-rc.5
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240617140730-1a5886e776de
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.29.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240705093617-560cbbd1f1e4
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/nspcc-dev/neo-go v0.106.0
github.com/prometheus/client_golang v1.19.1
@ -14,30 +18,44 @@ require (
github.com/ssgreg/journald v1.0.0
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/text v0.16.0
)
replace github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18
replace (
git.frostfs.info/TrueCloudLab/frostfs-s3-gw => git.frostfs.info/mbiryukova/frostfs-s3-gw v0.27.0-rc.1.0.20240709102501-0e1ab11a1bd7
github.com/nspcc-dev/neo-go => git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240611123832-594f716b3d18
)
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240530152826-2f6d3209e1d3 // indirect
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240409111539-e7a05a49ff45 // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.44.6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-chi/chi/v5 v5.0.8 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/minio/sio v0.3.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
@ -59,13 +77,22 @@ require (
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/urfave/cli v1.22.14 // indirect
go.etcd.io/bbolt v1.3.9 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect

View file

@ -0,0 +1,77 @@
package walletsource
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
type Source struct {
keys []*keys.PrivateKey
}
type Wallet struct {
Path string
Address string
Passphrase string
}
var _ lifecycle.CredentialSource = (*Source)(nil)
func New(wallets []Wallet) (*Source, error) {
privateKeys := make([]*keys.PrivateKey, len(wallets))
var err error
for i, w := range wallets {
if privateKeys[i], err = readPrivateKey(w); err != nil {
return nil, fmt.Errorf("read private key from wallet '%s': %w", w.Path, err)
}
}
return &Source{keys: privateKeys}, nil
}
func (s *Source) Credentials(_ context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error) {
for _, key := range s.keys {
if key.PublicKey().Equal(pk) {
return key, nil
}
}
return nil, errors.New("key not found")
}
func readPrivateKey(walletInfo Wallet) (*keys.PrivateKey, error) {
w, err := wallet.NewWalletFromFile(walletInfo.Path)
if err != nil {
return nil, fmt.Errorf("parse wallet: %w", err)
}
var addr util.Uint160
if walletInfo.Address == "" {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(walletInfo.Address)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
if acc == nil {
return nil, fmt.Errorf("couldn't find wallet account for %s", address.Uint160ToString(addr))
}
if err = acc.Decrypt(walletInfo.Passphrase, w.Scrypt); err != nil {
return nil, fmt.Errorf("couldn't decrypt account: %w", err)
}
return acc.PrivateKey(), nil
}

101
internal/frostfs/frostfs.go Normal file
View file

@ -0,0 +1,101 @@
package frostfs
import (
"context"
"encoding/xml"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"go.uber.org/zap"
"golang.org/x/text/encoding/ianaindex"
)
// FrostFS represents virtual connection to the FrostFS network.
// It is used to provide an interface to dependent packages
// which work with FrostFS.
type FrostFS struct {
pool *pool.Pool
log *zap.Logger
}
// NewFrostFS creates new FrostFS using provided pool.Pool.
func NewFrostFS(p *pool.Pool, log *zap.Logger) *FrostFS {
return &FrostFS{
pool: p,
log: log,
}
}
type PrmGetObject struct {
// Container to read the object header from.
Container cid.ID
// ID of the object for which to read the header.
Object oid.ID
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
BearerToken bearer.Token
}
func (f *FrostFS) GetObject(ctx context.Context, prm PrmGetObject) (pool.ResGetObject, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(addr)
prmGet.UseBearer(prm.BearerToken)
return f.pool.GetObject(ctx, prmGet)
}
func (f *FrostFS) LifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
prm := PrmGetObject{
Container: addr.Container(),
Object: addr.Object(),
}
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
prm.BearerToken = *bd.Gate.BearerToken
}
res, err := f.GetObject(ctx, prm)
if err != nil {
return nil, err
}
defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
f.log.Warn("could not close object payload", zap.String("address", addr.EncodeToString()), zap.Error(closeErr))
}
}()
lifecycleCfg := &data.LifecycleConfiguration{}
dec := newDecoder(res.Payload)
if err = dec.Decode(lifecycleCfg); err != nil {
return nil, fmt.Errorf("unmarshal lifecycle configuration '%s': %w", addr.EncodeToString(), err)
}
return lifecycleCfg, nil
}
const awsDefaultNamespace = "http://s3.amazonaws.com/doc/2006-03-01/"
func newDecoder(r io.Reader) *xml.Decoder {
dec := xml.NewDecoder(r)
dec.DefaultSpace = awsDefaultNamespace
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
enc, err := ianaindex.IANA.Encoding(charset)
if err != nil {
return nil, fmt.Errorf("charset %s: %w", charset, err)
}
return enc.NewDecoder().Reader(reader), nil
}
return dec
}

252
internal/frostfs/tree.go Normal file
View file

@ -0,0 +1,252 @@
package frostfs
import (
"context"
"errors"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
)
type GetNodeByPathResponseInfoWrapper struct {
response *grpcService.GetNodeByPathResponse_Info
}
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type GetSubTreeResponseBodyWrapper struct {
response *grpcService.GetSubTreeResponse_Body
}
func (n GetSubTreeResponseBodyWrapper) GetNodeID() uint64 {
return n.response.GetNodeId()
}
func (n GetSubTreeResponseBodyWrapper) GetParentID() uint64 {
return n.response.GetParentId()
}
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() uint64 {
return n.response.GetTimestamp()
}
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
res := make([]tree.Meta, len(n.response.Meta))
for i, value := range n.response.Meta {
res[i] = value
}
return res
}
type TreePoolWrapper struct {
p *treepool.Pool
}
func NewTreePoolWrapper(p *treepool.Pool) *TreePoolWrapper {
return &TreePoolWrapper{p: p}
}
func (w *TreePoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetNodesParams{
CID: prm.BktInfo.CID,
TreeID: prm.TreeID,
Path: prm.Path,
Meta: prm.Meta,
PathAttribute: tree.FileNameKey,
LatestOnly: prm.LatestOnly,
AllAttrs: prm.AllAttrs,
BearerToken: getBearer(ctx, prm.BktInfo),
}
nodes, err := w.p.GetNodes(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
res := make([]tree.NodeResponse, len(nodes))
for i, info := range nodes {
res[i] = GetNodeByPathResponseInfoWrapper{info}
}
return res, nil
}
func (w *TreePoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) ([]tree.NodeResponse, error) {
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
var subtree []tree.NodeResponse
node, err := subTreeReader.Next()
for err == nil {
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
node, err = subTreeReader.Next()
}
if err != nil && err != io.EOF {
return nil, handleError(err)
}
return subtree, nil
}
type SubTreeStreamImpl struct {
r *treepool.SubTreeReader
buffer []*grpcService.GetSubTreeResponse_Body
eof bool
index int
ln int
}
const bufSize = 1000
func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
if s.index != -1 {
node := s.buffer[s.index]
s.index++
if s.index >= s.ln {
s.index = -1
}
return GetSubTreeResponseBodyWrapper{response: node}, nil
}
if s.eof {
return nil, io.EOF
}
var err error
s.ln, err = s.r.Read(s.buffer)
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("sub tree stream impl pool wrap: %w", handleError(err))
}
s.eof = true
}
if s.ln > 0 {
s.index = 0
}
return s.Next()
}
func (w *TreePoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID uint64, depth uint32) (tree.SubTreeStream, error) {
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
Order: treepool.AscendingOrder,
}
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
if err != nil {
return nil, handleError(err)
}
return &SubTreeStreamImpl{
r: subTreeReader,
buffer: make([]*grpcService.GetSubTreeResponse_Body, bufSize),
index: -1,
}, nil
}
func (w *TreePoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
CID: bktInfo.CID,
TreeID: treeID,
Parent: parent,
Meta: meta,
BearerToken: getBearer(ctx, bktInfo),
})
return nodeID, handleError(err)
}
func (w *TreePoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
CID: bktInfo.CID,
TreeID: treeID,
Path: path,
Meta: meta,
PathAttribute: tree.FileNameKey,
BearerToken: getBearer(ctx, bktInfo),
})
return nodeID, handleError(err)
}
func (w *TreePoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
return handleError(w.p.MoveNode(ctx, treepool.MoveNodeParams{
CID: bktInfo.CID,
TreeID: treeID,
NodeID: nodeID,
ParentID: parentID,
Meta: meta,
BearerToken: getBearer(ctx, bktInfo),
}))
}
func (w *TreePoolWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
CID: bktInfo.CID,
TreeID: treeID,
NodeID: nodeID,
BearerToken: getBearer(ctx, bktInfo),
}))
}
func getBearer(ctx context.Context, bktInfo *data.BucketInfo) []byte {
if bd, err := middleware.GetBoxData(ctx); err == nil {
if bd.Gate.BearerToken != nil {
if bd.Gate.BearerToken.Impersonate() || bktInfo.Owner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
return bd.Gate.BearerToken.Marshal()
}
}
}
return nil
}
func handleError(err error) error {
if err == nil {
return nil
}
if errors.Is(err, treepool.ErrNodeNotFound) {
return fmt.Errorf("%w: %s", tree.ErrNodeNotFound, err.Error())
}
if errors.Is(err, treepool.ErrNodeAccessDenied) {
return fmt.Errorf("%w: %s", tree.ErrNodeAccessDenied, err.Error())
}
return err
}

View file

@ -0,0 +1,357 @@
package lifecycle
import (
"context"
"crypto/ecdsa"
"encoding/binary"
"encoding/hex"
"slices"
"sort"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/hrw"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
type UserFetcher interface {
Users() ([]util.Uint160, error)
UserKey(hash util.Uint160) (*keys.PublicKey, error)
}
type ContainerFetcher interface {
Containers(owner user.ID) ([]cid.ID, error)
}
type TreeFetcher interface {
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
}
type ConfigurationFetcher interface {
LifecycleConfiguration(ctx context.Context, addr oid.Address) (*data.LifecycleConfiguration, error)
}
type CredentialSource interface {
Credentials(ctx context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error)
}
type Job struct {
ContainerID cid.ID
PrivateKey *keys.PrivateKey
LifecycleConfiguration *data.LifecycleConfiguration
}
type JobProvider struct {
userFetcher UserFetcher
containerFetcher ContainerFetcher
treeFetcher TreeFetcher
configurationFetcher ConfigurationFetcher
credentialSource CredentialSource
settings Settings
currentLifecycler *keys.PrivateKey
log *zap.Logger
bufferSize int
}
type Settings interface {
ServicesKeys() keys.PublicKeys
}
type Config struct {
UserFetcher UserFetcher
ContainerFetcher ContainerFetcher
ConfigurationFetcher ConfigurationFetcher
CredentialSource CredentialSource
TreeFetcher TreeFetcher
Settings Settings
CurrentLifecycler *keys.PrivateKey
Logger *zap.Logger
BufferSize int
}
func NewJobProvider(cfg Config) *JobProvider {
return &JobProvider{
userFetcher: cfg.UserFetcher,
settings: cfg.Settings,
log: cfg.Logger,
containerFetcher: cfg.ContainerFetcher,
treeFetcher: cfg.TreeFetcher,
configurationFetcher: cfg.ConfigurationFetcher,
credentialSource: cfg.CredentialSource,
currentLifecycler: cfg.CurrentLifecycler,
bufferSize: cfg.BufferSize,
}
}
type objToHRW struct {
epoch uint64
hash util.Uint160
}
func (o objToHRW) bytes() []byte {
buf := make([]byte, binary.MaxVarintLen64)
ln := binary.PutUvarint(buf, o.epoch)
return append(o.hash[:], buf[:ln]...)
}
type UserContainer struct {
ID user.ID
Key *keys.PrivateKey
Container cid.ID
APEChain ape.Chain
}
func (p *JobProvider) GetJobs(ctx context.Context, epoch uint64) (<-chan Job, error) {
userHashes, err := p.userFetcher.Users()
if err != nil {
return nil, err
}
lifecyclers, currentPosition := p.svcKeys()
ch := make(chan *keys.PublicKey, p.bufferSize)
go func() {
defer close(ch)
indexes := make([]uint64, len(lifecyclers))
for i := range indexes {
indexes[i] = uint64(i)
}
obj := objToHRW{epoch: epoch}
for i := range userHashes {
obj.hash = userHashes[i]
h := hrw.Hash(obj.bytes())
if hrw.Sort(indexes, h)[0] != currentPosition {
continue
}
userKey, err := p.userFetcher.UserKey(userHashes[i])
if err != nil {
p.log.Warn(logs.CouldntFetchUserKeyByScriptHash,
zap.String("address", address.Uint160ToString(userHashes[i])))
continue
}
LOOP:
for {
select {
case <-ctx.Done():
p.log.Info(logs.JobProviderStopped, zap.Error(err))
return
case ch <- userKey:
break LOOP
}
}
}
}()
containers := p.fetchUserContainers(ctx, ch)
return p.fetchJobs(ctx, epoch, containers), nil
}
func (p *JobProvider) svcKeys() (keys.PublicKeys, uint64) {
currentPublicKey := p.currentLifecycler.PublicKey()
lifecyclerKeys := p.settings.ServicesKeys()
if position := slices.IndexFunc(lifecyclerKeys, func(pk *keys.PublicKey) bool {
return pk.Equal(currentPublicKey)
}); position == -1 {
lifecyclerKeys = append(lifecyclerKeys, currentPublicKey)
}
sort.Slice(lifecyclerKeys, func(i, j int) bool {
return lifecyclerKeys[i].Cmp(lifecyclerKeys[j]) == -1
})
position := slices.IndexFunc(lifecyclerKeys, func(pk *keys.PublicKey) bool {
return pk.Equal(currentPublicKey)
})
if position == -1 {
// should never happen
panic("current lifecycler key isn't in list")
}
return lifecyclerKeys, uint64(position)
}
func (p *JobProvider) fetchUserContainers(ctx context.Context, usersKeys <-chan *keys.PublicKey) <-chan UserContainer {
ch := make(chan UserContainer, p.bufferSize)
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case pk, ok := <-usersKeys:
if !ok {
return
}
privKey, err := p.credentialSource.Credentials(ctx, pk)
if err != nil {
p.log.Warn(logs.CouldntFetchUserCredentials,
zap.String("key", hex.EncodeToString(pk.Bytes())),
zap.Error(err))
continue
}
var userID user.ID
user.IDFromKey(&userID, (ecdsa.PublicKey)(*pk))
containers, err := p.containerFetcher.Containers(userID)
if err != nil {
p.log.Warn(logs.CouldntFetchUserContainers,
zap.String("user", userID.EncodeToString()),
zap.Error(err))
continue
}
p.log.Info(logs.FoundUserContainers,
zap.String("user", userID.EncodeToString()),
zap.Int("containers", len(containers)))
allowedChain := chain.Chain{
ID: chain.ID("lifecycler"),
Rules: []chain.Rule{{
Status: chain.Allow,
Actions: chain.Actions{Names: []string{"*"}},
Resources: chain.Resources{Names: []string{"*"}},
Condition: []chain.Condition{{
Op: chain.CondStringEquals,
Kind: chain.KindRequest,
Key: native.PropertyKeyActorPublicKey,
Value: hex.EncodeToString(pk.Bytes()),
}},
}},
}
for _, container := range containers {
uc := UserContainer{
ID: userID,
Key: privKey,
Container: container,
APEChain: ape.Chain{Raw: allowedChain.Bytes()},
}
select {
case <-ctx.Done():
return
case ch <- uc:
}
}
}
}
}()
return ch
}
func (p *JobProvider) fetchJobs(ctx context.Context, epoch uint64, users <-chan UserContainer) <-chan Job {
ch := make(chan Job, p.bufferSize)
var lifecyclerOwner user.ID
user.IDFromKey(&lifecyclerOwner, p.currentLifecycler.PrivateKey.PublicKey)
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case userInfo, ok := <-users:
if !ok {
return
}
bktInfo := &data.BucketInfo{
CID: userInfo.Container,
Owner: userInfo.ID,
}
apeOverride := bearer.APEOverride{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: userInfo.Container.EncodeToString(),
},
Chains: []ape.Chain{userInfo.APEChain},
}
var btoken bearer.Token
btoken.SetIat(epoch)
btoken.SetNbf(epoch)
btoken.SetExp(epoch + 2) // maybe +1, I'm not sure if we should configure this parameter
btoken.SetAPEOverride(apeOverride)
btoken.AssertUser(lifecyclerOwner)
if err := btoken.Sign(userInfo.Key.PrivateKey); err != nil {
p.log.Warn(logs.CouldntSignBearerToken,
zap.String("user", userInfo.ID.EncodeToString()),
zap.String("cid", userInfo.Container.EncodeToString()),
zap.Error(err))
continue
}
ctx = middleware.SetBox(ctx, &middleware.Box{
AccessBox: &accessbox.Box{
Gate: &accessbox.GateData{
BearerToken: &btoken,
},
},
})
objID, err := p.treeFetcher.GetBucketLifecycleConfiguration(ctx, bktInfo)
if err != nil {
p.log.Warn(logs.CouldntFetchLifecycleConfigFromTree,
zap.String("user", userInfo.ID.EncodeToString()),
zap.String("cid", userInfo.Container.EncodeToString()),
zap.Error(err))
continue
}
var addr oid.Address
addr.SetContainer(userInfo.Container)
addr.SetObject(objID)
configuration, err := p.configurationFetcher.LifecycleConfiguration(ctx, addr)
if err != nil {
p.log.Warn(logs.CouldntFetchLifecycleConfigFromStorage,
zap.String("user", userInfo.ID.EncodeToString()),
zap.String("addr", addr.EncodeToString()),
zap.Error(err))
continue
}
job := Job{
ContainerID: userInfo.Container,
PrivateKey: userInfo.Key,
LifecycleConfiguration: configuration,
}
select {
case <-ctx.Done():
return
case ch <- job:
}
}
}
}()
return ch
}

View file

@ -0,0 +1,295 @@
package lifecycle
import (
"context"
"errors"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
var _ UserFetcher = (*userFetcherMock)(nil)
type userFetcherMock struct {
users map[util.Uint160]*keys.PrivateKey
}
func newUserFetcherMock(users map[util.Uint160]*keys.PrivateKey) *userFetcherMock {
if users == nil {
users = map[util.Uint160]*keys.PrivateKey{}
}
return &userFetcherMock{
users: users,
}
}
func (u *userFetcherMock) Users() ([]util.Uint160, error) {
res := make([]util.Uint160, 0, len(u.users))
for hash := range u.users {
res = append(res, hash)
}
return res, nil
}
func (u *userFetcherMock) UserKey(hash util.Uint160) (*keys.PublicKey, error) {
key, ok := u.users[hash]
if !ok {
return nil, errors.New("userFetcherMock: hash not found")
}
return key.PublicKey(), nil
}
var _ ContainerFetcher = (*containerFetcherMock)(nil)
type containerFetcherMock struct {
containers map[util.Uint160][]cid.ID
}
func newContainerFetcherMock(containers map[util.Uint160][]cid.ID) *containerFetcherMock {
if containers == nil {
containers = map[util.Uint160][]cid.ID{}
}
return &containerFetcherMock{
containers: containers,
}
}
func (c *containerFetcherMock) Containers(owner user.ID) ([]cid.ID, error) {
hash, err := owner.ScriptHash()
if err != nil {
return nil, err
}
containers, ok := c.containers[hash]
if !ok {
return nil, errors.New("containerFetcherMock: hash not found")
}
return containers, nil
}
var _ ConfigurationFetcher = (*configurationFetcherMock)(nil)
type configurationFetcherMock struct {
configurations map[oid.Address]*data.LifecycleConfiguration
}
func newConfigurationFetcherMock(configs map[oid.Address]*data.LifecycleConfiguration) *configurationFetcherMock {
if configs == nil {
configs = map[oid.Address]*data.LifecycleConfiguration{}
}
return &configurationFetcherMock{
configurations: configs,
}
}
func (c *configurationFetcherMock) LifecycleConfiguration(_ context.Context, addr oid.Address) (*data.LifecycleConfiguration, error) {
val, ok := c.configurations[addr]
if !ok {
return nil, errors.New("configurationFetcherMock: hash not found")
}
return val, nil
}
var _ CredentialSource = (*credentialSourceMock)(nil)
type credentialSourceMock struct {
users map[util.Uint160]*keys.PrivateKey
}
func newCredentialSourceMock(users map[util.Uint160]*keys.PrivateKey) *credentialSourceMock {
if users == nil {
users = map[util.Uint160]*keys.PrivateKey{}
}
return &credentialSourceMock{
users: users,
}
}
func (c *credentialSourceMock) Credentials(_ context.Context, pk *keys.PublicKey) (*keys.PrivateKey, error) {
key, ok := c.users[pk.GetScriptHash()]
if !ok {
return nil, errors.New("credentialSourceMock: hash not found")
}
return key, nil
}
var _ TreeFetcher = (*treeFetcherMock)(nil)
type treeFetcherMock struct {
configurations map[cid.ID]oid.ID
}
func newTreeFetcherMock(configs map[cid.ID]oid.ID) *treeFetcherMock {
if configs == nil {
configs = map[cid.ID]oid.ID{}
}
return &treeFetcherMock{
configurations: configs,
}
}
func (t *treeFetcherMock) GetBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
val, ok := t.configurations[bktInfo.CID]
if !ok {
return oid.ID{}, errors.New("treeFetcherMock: hash not found")
}
return val, nil
}
var _ Settings = (*settingsMock)(nil)
type settingsMock struct{}
func (s *settingsMock) ServicesKeys() keys.PublicKeys {
return nil
}
func TestFetcherBase(t *testing.T) {
ctx := context.Background()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initMocks(2, 1)
require.NoError(t, err)
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
ConfigurationFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
}
f := NewJobProvider(cfg)
ch, err := f.GetJobs(ctx, 1)
require.NoError(t, err)
var res []Job
for job := range ch {
res = append(res, job)
}
require.Len(t, res, 2)
}
func TestFetcherCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := zaptest.NewLogger(t)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
mocks, err := initMocks(1, 3)
require.NoError(t, err)
cfg := Config{
UserFetcher: mocks.userFetcher,
ContainerFetcher: mocks.containerFetcher,
ConfigurationFetcher: mocks.configurationFetcher,
CredentialSource: mocks.credentialSource,
TreeFetcher: mocks.treeFetcher,
Settings: &settingsMock{},
CurrentLifecycler: key,
Logger: log,
}
f := NewJobProvider(cfg)
ch, err := f.GetJobs(ctx, 1)
require.NoError(t, err)
res := []Job{<-ch}
cancel()
<-ctx.Done()
for job := range ch {
res = append(res, job)
}
require.Len(t, res, 1)
}
type fetchersMock struct {
userFetcher *userFetcherMock
containerFetcher *containerFetcherMock
configurationFetcher *configurationFetcherMock
credentialSource *credentialSourceMock
treeFetcher *treeFetcherMock
}
func initMocks(users, containers int) (*fetchersMock, error) {
usersMap, err := generateUsersMap(users)
if err != nil {
return nil, err
}
cnrsMap := make(map[util.Uint160][]cid.ID)
treeMap := make(map[cid.ID]oid.ID)
configMap := make(map[oid.Address]*data.LifecycleConfiguration)
for hash := range usersMap {
for i := 0; i < containers; i++ {
addr := oidtest.Address()
cnrsMap[hash] = append(cnrsMap[hash], addr.Container())
treeMap[addr.Container()] = addr.Object()
configMap[addr] = &data.LifecycleConfiguration{Rules: []data.LifecycleRule{{ID: addr.EncodeToString()}}}
}
}
return &fetchersMock{
userFetcher: newUserFetcherMock(usersMap),
containerFetcher: newContainerFetcherMock(cnrsMap),
configurationFetcher: newConfigurationFetcherMock(configMap),
credentialSource: newCredentialSourceMock(usersMap),
treeFetcher: newTreeFetcherMock(treeMap),
}, nil
}
func generateKeys(n int) ([]*keys.PrivateKey, error) {
var err error
res := make([]*keys.PrivateKey, n)
for i := 0; i < n; i++ {
if res[i], err = keys.NewPrivateKey(); err != nil {
return nil, err
}
}
return res, nil
}
func generateUsersMap(n int) (map[util.Uint160]*keys.PrivateKey, error) {
res := make(map[util.Uint160]*keys.PrivateKey, n)
userKeys, err := generateKeys(n)
if err != nil {
return nil, err
}
for _, key := range userKeys {
res[key.GetScriptHash()] = key
}
return res, nil
}

View file

@ -1,25 +1,53 @@
package logs
const (
ApplicationStarted = "application started"
ApplicationStopped = "application stopped"
StoppingApplication = "stopping application"
ServiceIsRunning = "service is running"
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
ShuttingDownService = "shutting down service"
CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
CantShutDownService = "can't shut down service"
SIGHUPConfigReloadStarted = "SIGHUP config reload started"
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
FailedToReloadConfig = "failed to reload config"
LogLevelWontBeUpdated = "log level won't be updated"
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
NotificatorStopped = "notificator stopped"
ResolveNetmapContract = "failed to resolve netmap contract"
NewEpochWasTriggered = "new epoch was triggered"
ListenerCouldntBeReinitialized = "listener couldn't be reinitialized"
InitNotificator = "init notificator"
NoMorphRPCEndpoints = "no morph RPC endpoints"
FailedToLoadPrivateKey = "failed to load private key"
ApplicationStarted = "application started"
ApplicationStopped = "application stopped"
StoppingApplication = "stopping application"
ServiceIsRunning = "service is running"
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
ShuttingDownService = "shutting down service"
CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
CantShutDownService = "can't shut down service"
SIGHUPConfigReloadStarted = "SIGHUP config reload started"
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
FailedToReloadConfig = "failed to reload config"
LogLevelWontBeUpdated = "log level won't be updated"
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
ListenerStopped = "listener stopped"
MorphClientStopped = "morph client stopped"
MorphClientReconnection = "morph client reconnection..."
ListenerReconnection = "listener reconnection..."
MorphClientCouldntBeReconnected = "morph client couldn't be reconnected"
ListenerCouldntBeReconnected = "listener couldn't be reconnected"
ResolveNetmapContract = "failed to resolve netmap contract"
ResolveFrostfsIDContract = "failed to resolve frostfsid contract"
ResolveContainerContract = "failed to resolve container contract"
NewEpochWasTriggered = "new epoch was triggered"
InitNotificator = "init notificator"
NoMorphRPCEndpoints = "no morph RPC endpoints"
FailedToLoadPrivateKey = "failed to load private key"
NoCredentialSourceWallets = "no credential source wallets"
ListenerAlreadyStartedIgnoreParser = "listener already started, ignore parser"
ListenerAlreadyStartedIgnoreHandler = "listener already started, ignore handler"
IgnoreHandlerWithoutParser = "ignore handler without parser"
CouldntCreateWalletSource = "could not create wallet source"
AddedStoragePeer = "added storage peer"
FailedToCreateConnectionPool = "failed to create connection pool"
FailedToDialConnectionPool = "failed to dial connection pool"
FailedToCreateTreePool = "failed to create tree pool"
FailedToDialTreePool = "failed to dial tree pool"
CouldntFetchUserKeyByScriptHash = "could not fetch user key by script hash"
CouldntFetchUserCredentials = "could not fetch user credentials"
CouldntFetchUserContainers = "could not fetch user containers"
CouldntSignBearerToken = "could not sign bearer token"
CouldntFetchLifecycleConfigFromTree = "could not fetch bucket lifecycle configuration from tree"
CouldntFetchLifecycleConfigFromStorage = "could not fetch lifecycle configuration from object storage"
FoundUserContainers = "found user containers"
JobProviderStopped = "job provider stopped"
FailedToInitListener = "failed to init listener"
FailedToInitMorphClient = "failed to init morph client"
FailedToFetchServicesKeys = "failed to fetch lifecycle services keys"
FailedToGetJobs = "failed to get jobs"
)

119
internal/morph/client.go Normal file
View file

@ -0,0 +1,119 @@
package morph
import (
"context"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
)
type Client struct {
mu sync.RWMutex
client *client.Client
clientOptions []client.Option
log *zap.Logger
key *keys.PrivateKey
connLost chan struct{}
reconnectInterval time.Duration
reconnection chan struct{}
}
type Config struct {
Logger *zap.Logger
Endpoints []client.Endpoint
Key *keys.PrivateKey
ReconnectInterval time.Duration
DialTimeout time.Duration
}
func New(ctx context.Context, cfg Config) (*Client, error) {
c := &Client{
log: cfg.Logger,
key: cfg.Key,
connLost: make(chan struct{}),
reconnectInterval: cfg.ReconnectInterval,
reconnection: make(chan struct{}),
}
c.clientOptions = []client.Option{
client.WithLogger(&logger.Logger{Logger: cfg.Logger}),
client.WithEndpoints(cfg.Endpoints...),
client.WithConnLostCallback(func() { c.connLost <- struct{}{} }),
client.WithDialTimeout(cfg.DialTimeout),
}
if err := c.initNewClient(ctx); err != nil {
return nil, err
}
go c.reconnectRoutine(ctx)
return c, nil
}
func (c *Client) reconnectRoutine(ctx context.Context) {
ticker := time.NewTicker(c.reconnectInterval)
defer func() {
ticker.Stop()
close(c.connLost)
close(c.reconnection)
}()
for {
select {
case <-ctx.Done():
c.log.Info(logs.MorphClientStopped, zap.Error(ctx.Err()))
return
case <-c.connLost:
c.Client().Close()
LOOP:
for {
select {
case <-ctx.Done():
c.log.Info(logs.MorphClientStopped, zap.Error(ctx.Err()))
return
case <-ticker.C:
c.log.Info(logs.MorphClientReconnection)
if err := c.initNewClient(ctx); err != nil {
c.log.Error(logs.MorphClientCouldntBeReconnected, zap.Error(err))
ticker.Reset(c.reconnectInterval)
continue
}
c.reconnection <- struct{}{}
break LOOP
}
}
}
}
}
func (c *Client) initNewClient(ctx context.Context) error {
cli, err := client.New(ctx, c.key, c.clientOptions...)
if err != nil {
return fmt.Errorf("create new client: %w", err)
}
c.mu.Lock()
c.client = cli
c.mu.Unlock()
return nil
}
func (c *Client) Client() *client.Client {
c.mu.RLock()
defer c.mu.RUnlock()
return c.client
}
func (c *Client) ReconnectionChannel() <-chan struct{} {
return c.reconnection
}

View file

@ -0,0 +1,70 @@
package contract
import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"go.uber.org/zap"
)
type Container struct {
client *morph.Client
contractHash util.Uint160
log *zap.Logger
}
type ContainerConfig struct {
Client *morph.Client
ContractHash util.Uint160
Log *zap.Logger
}
const (
batchSize = 100
containersOfMethod = "containersOf"
)
func NewContainer(cfg ContainerConfig) *Container {
return &Container{
client: cfg.Client,
contractHash: cfg.ContractHash,
log: cfg.Log,
}
}
func (c *Container) Containers(ownerID user.ID) ([]cid.ID, error) {
items, err := commonclient.ReadIteratorItems(c.client.Client().GetActor(), batchSize, c.contractHash, containersOfMethod, ownerID.WalletBytes())
if err != nil {
return nil, fmt.Errorf("read iterator items (%s): %w", containersOfMethod, err)
}
cidList, err := decodeCID(items)
if err != nil {
return nil, err
}
return cidList, nil
}
func decodeCID(items []stackitem.Item) ([]cid.ID, error) {
cidList := make([]cid.ID, len(items))
for i, item := range items {
rawID, err := client.BytesFromStackItem(item)
if err != nil {
return nil, fmt.Errorf("could not get byte array from stack item: %w", err)
}
if err = cidList[i].Decode(rawID); err != nil {
return nil, fmt.Errorf("decode container id: %w", err)
}
}
return cidList, nil
}

View file

@ -0,0 +1,92 @@
package contract
import (
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
morphclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
)
type FrostFSID struct {
morphClient *morph.Client
contractHash util.Uint160
mu sync.RWMutex
cli *client.Client
}
type FrostFSIDConfig struct {
// Client is a multi neo-go client with auto reconnect.
Client *morph.Client
// Contract is hash of contract.
ContractHash util.Uint160
}
// NewFrostFSID creates new FrostfsID contract wrapper.
func NewFrostFSID(cfg FrostFSIDConfig) *FrostFSID {
ffsid := &FrostFSID{
morphClient: cfg.Client,
contractHash: cfg.ContractHash,
cli: client.NewSimple(cfg.Client.Client().GetActor(), cfg.ContractHash),
}
return ffsid
}
func (f *FrostFSID) Users() ([]util.Uint160, error) {
var res []util.Uint160
err := f.requestWithRetryOnConnectionLost(func(c *client.Client) error {
var inErr error
res, inErr = c.ListSubjects()
return inErr
})
return res, err
}
func (f *FrostFSID) UserKey(hash util.Uint160) (*keys.PublicKey, error) {
var res *client.Subject
err := f.requestWithRetryOnConnectionLost(func(c *client.Client) error {
var inErr error
res, inErr = c.GetSubject(hash)
return inErr
})
if err != nil {
return nil, err
}
return res.PrimaryKey, nil
}
func (f *FrostFSID) requestWithRetryOnConnectionLost(fn func(c *client.Client) error) error {
err := fn(f.client())
if err == nil {
return nil
}
if !errors.Is(err, morphclient.ErrConnectionLost) {
return err
}
f.initNewClient()
return fn(f.client())
}
func (f *FrostFSID) initNewClient() {
f.mu.Lock()
f.cli = client.NewSimple(f.morphClient.Client().GetActor(), f.contractHash)
f.mu.Unlock()
}
func (f *FrostFSID) client() *client.Client {
f.mu.RLock()
defer f.mu.RUnlock()
return f.cli
}

View file

@ -64,7 +64,7 @@ func (h *handlerLimiter) Handler(e event.Event) {
}
workCtx := h.replaceCurrentWorkContext(h.ctx)
h.log.Debug(logs.NewEpochWasTriggered, zap.Int64("epoch", ee.Epoch))
h.log.Debug(logs.NewEpochWasTriggered, zap.Uint64("epoch", ee.Epoch))
h.work <- func() {
h.handler(workCtx, ee)
}

View file

@ -0,0 +1,190 @@
package notificator
import (
"context"
"fmt"
"slices"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/morph"
"go.uber.org/zap"
)
type ListenerImpl struct {
client *morph.Client
log *zap.Logger
reconnectInterval time.Duration
once sync.Once
mu sync.RWMutex
listener event.Listener
started bool
parsers []event.NotificationParserInfo
handlers []event.NotificationHandlerInfo
}
type ConfigListener struct {
Client *morph.Client
Logger *zap.Logger
ReconnectInterval time.Duration
}
var _ Listener = (*ListenerImpl)(nil)
func NewListener(ctx context.Context, cfg ConfigListener) (*ListenerImpl, error) {
l := &ListenerImpl{
client: cfg.Client,
log: cfg.Logger,
reconnectInterval: cfg.ReconnectInterval,
}
if err := l.initNewListener(ctx); err != nil {
return nil, err
}
return l, nil
}
func (l *ListenerImpl) Listen(ctx context.Context) {
l.once.Do(func() {
l.setStarted()
l.setParsersAndHandlers()
go l.currentListener().Listen(ctx)
l.reconnectRoutine(ctx)
})
}
func (l *ListenerImpl) SetNotificationParser(info event.NotificationParserInfo) {
l.mu.Lock()
defer l.mu.Unlock()
if l.started {
l.log.Warn(logs.ListenerAlreadyStartedIgnoreParser,
zap.String("type", info.GetType().String()),
zap.String("hash", info.ScriptHash().StringLE()))
return
}
l.parsers = append(l.parsers, info)
}
func (l *ListenerImpl) RegisterNotificationHandler(info event.NotificationHandlerInfo) {
l.mu.Lock()
defer l.mu.Unlock()
if l.started {
l.log.Warn(logs.ListenerAlreadyStartedIgnoreHandler,
zap.String("type", info.GetType().String()),
zap.String("hash", info.ScriptHash().StringLE()))
return
}
parserExists := slices.ContainsFunc(l.parsers, func(p event.NotificationParserInfo) bool {
return p.GetType().Equal(info.GetType()) && p.ScriptHash().Equals(info.ScriptHash())
})
if !parserExists {
l.log.Warn(logs.IgnoreHandlerWithoutParser,
zap.String("type", info.GetType().String()),
zap.String("hash", info.ScriptHash().StringLE()))
return
}
l.handlers = append(l.handlers, info)
}
func (l *ListenerImpl) reconnectRoutine(ctx context.Context) {
ticker := time.NewTicker(l.reconnectInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
l.log.Info(logs.ListenerStopped, zap.Error(ctx.Err()))
return
case <-l.client.ReconnectionChannel():
LOOP:
for {
select {
case <-ctx.Done():
l.log.Info(logs.ListenerStopped, zap.Error(ctx.Err()))
return
case <-ticker.C:
l.log.Info(logs.ListenerReconnection)
if err := l.initNewListener(ctx); err != nil {
l.log.Error(logs.ListenerCouldntBeReconnected, zap.Error(err))
ticker.Reset(l.reconnectInterval)
continue
}
l.setParsersAndHandlers()
go l.currentListener().Listen(ctx)
break LOOP
}
}
}
}
}
func (l *ListenerImpl) setStarted() {
l.mu.Lock()
l.started = true
l.mu.Unlock()
}
func (l *ListenerImpl) initNewListener(ctx context.Context) error {
currentBlock, err := l.client.Client().BlockCount()
if err != nil {
return fmt.Errorf("get block count: %w", err)
}
morphLogger := &logger.Logger{Logger: l.log}
subs, err := subscriber.New(ctx, &subscriber.Params{
Log: morphLogger,
StartFromBlock: currentBlock,
Client: l.client.Client(),
})
if err != nil {
return fmt.Errorf("create subscriber: %w", err)
}
ln, err := event.NewListener(event.ListenerParams{
Logger: morphLogger,
Subscriber: subs,
WorkerPoolCapacity: 0, // 0 means "infinite"
})
if err != nil {
return err
}
l.mu.Lock()
l.listener = ln
l.mu.Unlock()
return nil
}
func (l *ListenerImpl) currentListener() event.Listener {
l.mu.RLock()
defer l.mu.RUnlock()
return l.listener
}
func (l *ListenerImpl) setParsersAndHandlers() {
l.mu.RLock()
defer l.mu.RUnlock()
for _, parser := range l.parsers {
l.listener.SetNotificationParser(parser)
}
for _, handler := range l.handlers {
l.listener.RegisterNotificationHandler(handler)
}
}

View file

@ -3,10 +3,8 @@ package notificator
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
@ -16,107 +14,74 @@ import (
type NewEpochHandler func(ctx context.Context, ee NewEpochEvent)
type NewEpochEvent struct {
Epoch int64
Epoch uint64
}
func (n NewEpochEvent) MorphEvent() {}
type ListenerCreationFunc func(connectionLostCallback func()) (event.Listener, error)
type Listener interface {
// Listen must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
// SetNotificationParser must set the parser of particular contract event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
SetNotificationParser(event.NotificationParserInfo)
// RegisterNotificationHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
RegisterNotificationHandler(event.NotificationHandlerInfo)
}
type Notificator struct {
logger *zap.Logger
listener event.Listener
handler *handlerLimiter
connLost chan struct{}
netmapContract util.Uint160
newListener ListenerCreationFunc
reconnectClientsInterval time.Duration
logger *zap.Logger
listener Listener
handler *handlerLimiter
}
type Config struct {
Handler NewEpochHandler
Logger *zap.Logger
NewListener ListenerCreationFunc
NetmapContract util.Uint160
ReconnectClientsInterval time.Duration
Handler NewEpochHandler
Logger *zap.Logger
Listener Listener
NetmapContract util.Uint160
}
const newEpochEventType = event.Type("NewEpoch")
func New(ctx context.Context, cfg Config) (*Notificator, error) {
notifier := &Notificator{
netmapContract: cfg.NetmapContract,
handler: newHandlerLimiter(ctx, cfg.Handler, cfg.Logger),
connLost: make(chan struct{}),
newListener: cfg.NewListener,
logger: cfg.Logger,
reconnectClientsInterval: cfg.ReconnectClientsInterval,
}
if err := notifier.initListener(); err != nil {
return nil, fmt.Errorf("init listener: %w", err)
}
return notifier, nil
}
func (n *Notificator) initListener() error {
listener, err := n.newListener(func() { n.connLost <- struct{}{} })
if err != nil {
return err
handler: newHandlerLimiter(ctx, cfg.Handler, cfg.Logger),
logger: cfg.Logger,
listener: cfg.Listener,
}
var npi event.NotificationParserInfo
npi.SetScriptHash(n.netmapContract)
npi.SetScriptHash(cfg.NetmapContract)
npi.SetType(newEpochEventType)
npi.SetParser(newEpochEventParser())
listener.SetNotificationParser(npi)
notifier.listener.SetNotificationParser(npi)
var nhi event.NotificationHandlerInfo
nhi.SetType(newEpochEventType)
nhi.SetScriptHash(n.netmapContract)
nhi.SetHandler(n.handler.Handler)
listener.RegisterNotificationHandler(nhi)
nhi.SetScriptHash(cfg.NetmapContract)
nhi.SetHandler(notifier.handler.Handler)
notifier.listener.RegisterNotificationHandler(nhi)
n.listener = listener
return nil
return notifier, nil
}
// Start runs listener to process notifications.
// Method MUST be invoked once after successful initialization with New
// otherwise panic can happen.
func (n *Notificator) Start(ctx context.Context) {
go n.listener.Listen(ctx)
ticker := time.NewTicker(n.reconnectClientsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
n.logger.Info(logs.NotificatorStopped, zap.Error(ctx.Err()))
return
case <-n.connLost:
n.listener.Stop()
LOOP:
for {
select {
case <-ctx.Done():
n.logger.Info(logs.NotificatorStopped, zap.Error(ctx.Err()))
return
case <-ticker.C:
if err := n.initListener(); err != nil {
n.logger.Error(logs.ListenerCouldntBeReinitialized, zap.Error(err))
ticker.Reset(n.reconnectClientsInterval)
continue
}
go n.listener.Listen(ctx)
break LOOP
}
}
}
}
n.listener.Listen(ctx)
}
func newEpochEventParser() event.NotificationParser {
@ -134,7 +99,7 @@ func newEpochEventParser() event.NotificationParser {
return nil, err
}
return NewEpochEvent{Epoch: epoch.Int64()}, nil
return NewEpochEvent{Epoch: epoch.Uint64()}, nil
}
}

View file

@ -8,7 +8,6 @@ import (
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/stretchr/testify/require"
@ -24,12 +23,10 @@ type scriptHashWithType struct {
type listenerMock struct {
scriptHashWithType
mu sync.Mutex
parsers map[scriptHashWithType]event.NotificationParserInfo
handlers map[scriptHashWithType][]event.NotificationHandlerInfo
started, stopped bool
lostConnectionCallback func()
mu sync.Mutex
parsers map[scriptHashWithType]event.NotificationParserInfo
handlers map[scriptHashWithType][]event.NotificationHandlerInfo
started bool
}
func newListenerMock(hash util.Uint160) *listenerMock {
@ -40,8 +37,6 @@ func newListenerMock(hash util.Uint160) *listenerMock {
},
parsers: map[scriptHashWithType]event.NotificationParserInfo{},
handlers: map[scriptHashWithType][]event.NotificationHandlerInfo{},
started: false,
stopped: false,
}
}
@ -65,30 +60,20 @@ func (l *listenerMock) sendNotification(epochEvent NewEpochEvent) error {
return nil
}
func (l *listenerMock) refresh() {
l.mu.Lock()
defer l.mu.Unlock()
l.started = false
l.stopped = false
l.parsers = map[scriptHashWithType]event.NotificationParserInfo{}
l.handlers = map[scriptHashWithType][]event.NotificationHandlerInfo{}
}
func (l *listenerMock) Listen(context.Context) {
l.mu.Lock()
l.started = true
l.mu.Unlock()
}
func (l *listenerMock) ListenWithError(context.Context, chan<- error) {
panic("not implemented")
}
func (l *listenerMock) SetNotificationParser(info event.NotificationParserInfo) {
l.mu.Lock()
defer l.mu.Unlock()
if l.started {
panic("listener already started")
}
l.parsers[scriptHashWithType{
eventType: info.GetType(),
contractHash: info.ScriptHash(),
@ -99,6 +84,10 @@ func (l *listenerMock) RegisterNotificationHandler(info event.NotificationHandle
l.mu.Lock()
defer l.mu.Unlock()
if l.started {
panic("listener already started")
}
key := scriptHashWithType{
eventType: info.GetType(),
contractHash: info.ScriptHash(),
@ -108,28 +97,6 @@ func (l *listenerMock) RegisterNotificationHandler(info event.NotificationHandle
l.handlers[key] = append(list, info)
}
func (l *listenerMock) EnableNotarySupport(util.Uint160, client.AlphabetKeys, event.BlockCounter) {
panic("not implemented")
}
func (l *listenerMock) SetNotaryParser(event.NotaryParserInfo) {
panic("not implemented")
}
func (l *listenerMock) RegisterNotaryHandler(event.NotaryHandlerInfo) {
panic("not implemented")
}
func (l *listenerMock) RegisterBlockHandler(event.BlockHandler) {
panic("not implemented")
}
func (l *listenerMock) Stop() {
l.mu.Lock()
l.stopped = true
l.mu.Unlock()
}
func TestNotificatorBase(t *testing.T) {
ctx := context.Background()
@ -152,15 +119,10 @@ func TestNotificatorBase(t *testing.T) {
lnMock := newListenerMock(contractHash)
cfg := Config{
Handler: handler,
Logger: logger,
NewListener: func(cb func()) (event.Listener, error) {
lnMock.lostConnectionCallback = cb
lnMock.refresh()
return lnMock, nil
},
NetmapContract: contractHash,
ReconnectClientsInterval: 100 * time.Millisecond,
Handler: handler,
Logger: logger,
NetmapContract: contractHash,
Listener: lnMock,
}
n, err := New(ctx, cfg)
@ -175,12 +137,6 @@ func TestNotificatorBase(t *testing.T) {
ee = NewEpochEvent{Epoch: 2}
sendNotification(t, lnMock, ee, &wg)
require.Equal(t, ee.Epoch, gotEvent.Epoch)
lnMock.lostConnectionCallback()
ee = NewEpochEvent{Epoch: 3}
sendNotification(t, lnMock, ee, &wg)
require.Equal(t, ee.Epoch, gotEvent.Epoch)
}
func sendNotification(t *testing.T, lnMock *listenerMock, ee NewEpochEvent, wg *sync.WaitGroup) {