[#185] Update SDK to support new tree/pool version

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-12-20 09:45:42 +03:00 committed by Alexey Vanin
parent e1b670a727
commit 1db62f9d95
15 changed files with 299 additions and 38 deletions

View file

@ -17,12 +17,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/zapjournald"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/ssgreg/journald"
@ -144,6 +144,7 @@ const (
// Caching.
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
cfgBucketsCacheSize = "cache.buckets.size"
cfgNetmapCacheLifetime = "cache.netmap.lifetime"
// Bucket resolving options.
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
@ -166,6 +167,7 @@ const (
// Feature.
cfgFeaturesEnableFilepathFallback = "features.enable_filepath_fallback"
cfgFeaturesTreePoolNetmapSupport = "features.tree_pool_netmap_support"
// Command line args.
cmdHelp = "help"
@ -611,10 +613,10 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
return servers
}
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
key, err := getFrostFSKey(cfg, logger)
func (a *app) initPools(ctx context.Context) {
key, err := getFrostFSKey(a.cfg, a.log)
if err != nil {
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
}
var prm pool.InitParameters
@ -622,77 +624,83 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSou
prm.SetKey(&key.PrivateKey)
prmTree.SetKey(key)
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
for _, peer := range fetchPeers(logger, cfg) {
for _, peer := range fetchPeers(a.log, a.cfg) {
prm.AddNode(peer)
prmTree.AddNode(peer)
}
connTimeout := cfg.GetDuration(cfgConTimeout)
connTimeout := a.cfg.GetDuration(cfgConTimeout)
if connTimeout <= 0 {
connTimeout = defaultConnectTimeout
}
prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
streamTimeout := a.cfg.GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout
}
prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := cfg.GetDuration(cfgReqTimeout)
healthCheckTimeout := a.cfg.GetDuration(cfgReqTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultRequestTimeout
}
prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := cfg.GetDuration(cfgRebalance)
rebalanceInterval := a.cfg.GetDuration(cfgRebalance)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceTimer
}
prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
errorThreshold := a.cfg.GetUint32(cfgPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold
}
prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(logger)
prmTree.SetLogger(logger)
prm.SetLogger(a.log)
prmTree.SetLogger(a.log)
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
prmTree.SetMaxRequestAttempts(a.cfg.GetInt(cfgTreePoolMaxAttempts))
interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
grpc.WithContextDialer(a.settings.dialerSource.GrpcContextDialer()),
}
prm.SetGRPCDialOptions(interceptors...)
prmTree.SetGRPCDialOptions(interceptors...)
p, err := pool.NewPool(prm)
if err != nil {
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
}
if err = p.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
}
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) {
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.cfg, a.log)), a.bucketCache, a.log))
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
}
if err = treePool.Dial(ctx); err != nil {
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err))
}
return p, treePool, key
a.pool = p
a.treePool = treePool
a.key = key
}
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
@ -733,7 +741,7 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
return int64(softMemoryLimit)
}
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
func getBucketCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultBucketConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
@ -742,6 +750,14 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
return cacheCfg
}
func getNetmapCacheOptions(v *viper.Viper, l *zap.Logger) *cache.NetmapCacheConfig {
cacheCfg := cache.DefaultNetmapConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgNetmapCacheLifetime, cacheCfg.Lifetime)
return cacheCfg
}
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)