[#59] Use tree pool from SDK

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-06-09 09:39:01 +03:00
parent 5be537321b
commit 1dfbe36eca
7 changed files with 338 additions and 166 deletions

80
app.go
View file

@ -6,12 +6,10 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
@ -21,7 +19,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/fasthttp/router"
"github.com/nspcc-dev/neo-go/cli/flags"
@ -32,8 +32,6 @@ import (
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type (
@ -42,6 +40,7 @@ type (
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
owner *user.ID
cfg *viper.Viper
@ -98,10 +97,6 @@ func WithConfig(c *viper.Viper) Option {
}
func newApp(ctx context.Context, opt ...Option) App {
var (
err error
)
a := &app{
ctx: ctx,
log: zap.L(),
@ -126,51 +121,12 @@ func newApp(ctx context.Context, opt ...Option) App {
a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
a.key, err = getFrostFSKey(a)
if err != nil {
a.log.Fatal("failed to get frostfs credentials", zap.Error(err))
}
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg)
var owner user.ID
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
a.owner = &owner
var prm pool.InitParameters
prm.SetKey(&a.key.PrivateKey)
prm.SetNodeDialTimeout(a.cfg.GetDuration(cfgConTimeout))
prm.SetNodeStreamTimeout(a.cfg.GetDuration(cfgStreamTimeout))
prm.SetHealthcheckTimeout(a.cfg.GetDuration(cfgReqTimeout))
prm.SetClientRebalanceInterval(a.cfg.GetDuration(cfgRebalance))
prm.SetErrorThreshold(a.cfg.GetUint32(cfgPoolErrorThreshold))
for i := 0; ; i++ {
address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address")
weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight")
priority := a.cfg.GetInt(cfgPeers + "." + strconv.Itoa(i) + ".priority")
if address == "" {
break
}
if weight <= 0 { // unspecified or wrong
weight = 1
}
if priority <= 0 { // unspecified or wrong
priority = 1
}
prm.AddNode(pool.NewNodeParam(priority, address, weight))
a.log.Info("add connection", zap.String("address", address),
zap.Float64("weight", weight), zap.Int("priority", priority))
}
a.pool, err = pool.NewPool(prm)
if err != nil {
a.log.Fatal("failed to create connection pool", zap.Error(err))
}
err = a.pool.Dial(ctx)
if err != nil {
a.log.Fatal("failed to dial pool", zap.Error(err))
}
a.initAppSettings()
a.initResolver()
a.initMetrics()
@ -283,11 +239,11 @@ func remove(list []string, element string) []string {
return list
}
func getFrostFSKey(a *app) (*keys.PrivateKey, error) {
walletPath := a.cfg.GetString(cfgWalletPath)
func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error) {
walletPath := cfg.GetString(cfgWalletPath)
if len(walletPath) == 0 {
a.log.Info("no wallet path specified, creating ephemeral key automatically for this run")
log.Info("no wallet path specified, creating ephemeral key automatically for this run")
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
@ -300,12 +256,12 @@ func getFrostFSKey(a *app) (*keys.PrivateKey, error) {
}
var password *string
if a.cfg.IsSet(cfgWalletPassphrase) {
pwd := a.cfg.GetString(cfgWalletPassphrase)
if cfg.IsSet(cfgWalletPassphrase) {
pwd := cfg.GetString(cfgWalletPassphrase)
password = &pwd
}
address := a.cfg.GetString(cfgWalletAddress)
address := cfg.GetString(cfgWalletAddress)
return getKeyFromWallet(w, address, password)
}
@ -357,9 +313,8 @@ func (a *app) setHealthStatus() {
}
func (a *app) Serve() {
treeClient := a.initTree(a.ctx)
uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader)
downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, treeClient)
downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, tree.NewTree(services.NewPoolWrapper(a.treePool)))
// Configure router.
a.configureRouter(uploadRoutes, downloadRoutes)
@ -599,19 +554,6 @@ func (a *app) serverIndex(address string) int {
return -1
}
func (a *app) initTree(ctx context.Context) *tree.Tree {
treeServiceEndpoint := a.cfg.GetString(cfgTreeServiceEndpoint)
grpcDialOpt := grpc.WithTransportCredentials(insecure.NewCredentials())
treeGRPCClient, err := services.NewTreeServiceClientGRPC(ctx, treeServiceEndpoint, a.key, grpcDialOpt)
if err != nil {
a.log.Fatal("failed to create tree service", zap.Error(err))
}
treeService := tree.NewTree(treeGRPCClient)
a.log.Info("init tree service", zap.String("endpoint", treeServiceEndpoint))
return treeService
}
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {