diff --git a/app.go b/app.go index bbfead4..bcb2537 100644 --- a/app.go +++ b/app.go @@ -2,6 +2,7 @@ package main import ( "context" + "math" "strconv" "github.com/fasthttp/router" @@ -104,6 +105,7 @@ func newApp(ctx context.Context, opt ...Option) App { NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout), NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout), ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance), + SessionExpirationEpoch: math.MaxUint64, } pool, err := pb.Build(ctx, opts) if err != nil { diff --git a/connections/pool.go b/connections/pool.go index 48c859b..e705ea1 100644 --- a/connections/pool.go +++ b/connections/pool.go @@ -3,13 +3,14 @@ package connections import ( "context" "crypto/ecdsa" - "errors" "math" "math/rand" "sync" "time" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -18,6 +19,7 @@ type PoolBuilderOptions struct { NodeConnectionTimeout time.Duration NodeRequestTimeout time.Duration ClientRebalanceInterval time.Duration + SessionExpirationEpoch uint64 weights []float64 connections []*grpc.ClientConn } @@ -59,47 +61,54 @@ func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) ( } type Pool interface { - Client() client.Client + ConnectionArtifacts() (client.Client, *token.SessionToken, error) +} + +type clientPack struct { + client client.Client + sessionToken *token.SessionToken + healthy bool } type pool struct { - lock sync.RWMutex - sampler *Sampler - clients []client.Client - healthy []bool + lock sync.RWMutex + sampler *Sampler + clientPacks []*clientPack } func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { - n := len(options.weights) - clients := make([]client.Client, n) - healthy := make([]bool, n) + clientPacks := make([]*clientPack, len(options.weights)) for i, con := range options.connections { c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con)) if err != nil { return nil, err } - clients[i] = c - healthy[i] = true + st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) + if err != nil { + address := "unknown" + if epi, err := c.EndpointInfo(ctx); err == nil { + address = epi.NodeInfo().Address() + } + return nil, errors.Wrapf(err, "failed to create neofs session token for client %s", address) + } + clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true} } source := rand.NewSource(time.Now().UnixNano()) - pool := &pool{ - sampler: NewSampler(options.weights, source), - clients: clients, - healthy: healthy, - } + sampler := NewSampler(options.weights, source) + pool := &pool{sampler: sampler, clientPacks: clientPacks} go func() { ticker := time.NewTimer(options.ClientRebalanceInterval) for range ticker.C { ok := true - for i, client := range pool.clients { + for i, clientPack := range pool.clientPacks { func() { tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) defer c() - if _, err := client.EndpointInfo(tctx); err != nil { + if _, err := clientPack.client.EndpointInfo(tctx); err != nil { ok = false } pool.lock.Lock() - pool.healthy[i] = ok + pool.clientPacks[i].healthy = ok pool.lock.Unlock() }() } @@ -109,24 +118,26 @@ func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { return pool, nil } -func (p *pool) Client() client.Client { +func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { p.lock.RLock() defer p.lock.RUnlock() - if len(p.clients) == 1 { - if p.healthy[0] { - return p.clients[0] + if len(p.clientPacks) == 1 { + cp := p.clientPacks[0] + if cp.healthy { + return cp.client, cp.sessionToken, nil } - return nil + return nil, nil, errors.New("no healthy client") } var i *int = nil for k := 0; k < 10; k++ { i_ := p.sampler.Next() - if p.healthy[i_] { + if p.clientPacks[i_].healthy { i = &i_ } } if i != nil { - return p.clients[*i] + cp := p.clientPacks[*i] + return cp.client, cp.sessionToken, nil } - return nil + return nil, nil, errors.New("no healthy client") } diff --git a/downloader/download.go b/downloader/download.go index 37da819..8d447b0 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -10,10 +10,8 @@ import ( "sync" "time" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/tokens" "github.com/pkg/errors" @@ -138,40 +136,35 @@ func (o objectIDs) Slice() []string { } type Downloader struct { - log *zap.Logger - plant neofs.ClientPlant - getOperations struct { - client client.Client - sessionToken *token.SessionToken - } + log *zap.Logger + plant neofs.ClientPlant } func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downloader, error) { var err error d := &Downloader{log: log, plant: plant} - d.getOperations.client, d.getOperations.sessionToken, err = d.plant.GetReusableArtifacts(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get neofs client's reusable artifacts") } return d, nil } -func (a *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { +func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ RequestCtx: ctx, log: log, - objectClient: a.plant.Object(), + objectClient: d.plant.Object(), } } -func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { +func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { var ( err error address = object.NewAddress() cid, _ = c.UserValue("cid").(string) oid, _ = c.UserValue("oid").(string) val = strings.Join([]string{cid, oid}, "/") - log = a.log.With(zap.String("cid", cid), zap.String("oid", oid)) + log = d.log.With(zap.String("cid", cid), zap.String("oid", oid)) ) if err = address.Parse(val); err != nil { log.Error("wrong object address", zap.Error(err)) @@ -180,20 +173,24 @@ func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { } getOpts := getOptionsPool.Get().(*neofs.GetOptions) defer getOptionsPool.Put(getOpts) - getOpts.Client = a.getOperations.client - getOpts.SessionToken = a.getOperations.sessionToken + getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + if err != nil { + log.Error("failed to get neofs connection artifacts", zap.Error(err)) + c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) + return + } getOpts.ObjectAddress = address getOpts.Writer = nil - a.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(getOpts) } -func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { +func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { var ( err error scid, _ = c.UserValue("cid").(string) key, _ = c.UserValue("attr_key").(string) val, _ = c.UserValue("attr_val").(string) - log = a.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) + log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) ) cid := container.NewID() if err = cid.Parse(scid); err != nil { @@ -203,14 +200,18 @@ func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { } searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions) defer searchOptionsPool.Put(searchOpts) - searchOpts.Client = a.getOperations.client - searchOpts.SessionToken = a.getOperations.sessionToken + searchOpts.Client, searchOpts.SessionToken, err = d.plant.ConnectionArtifacts() + if err != nil { + log.Error("failed to get neofs connection artifacts", zap.Error(err)) + c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) + return + } searchOpts.BearerToken = nil searchOpts.ContainerID = cid searchOpts.Attribute.Key = key searchOpts.Attribute.Value = val var ids []*object.ID - if ids, err = a.plant.Object().Search(c, searchOpts); err != nil { + if ids, err = d.plant.Object().Search(c, searchOpts); err != nil { log.Error("something went wrong", zap.Error(err)) c.Error("something went wrong", fasthttp.StatusBadRequest) return @@ -229,9 +230,13 @@ func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { address.SetObjectID(ids[0]) getOpts := getOptionsPool.Get().(*neofs.GetOptions) defer getOptionsPool.Put(getOpts) - getOpts.Client = a.getOperations.client - getOpts.SessionToken = a.getOperations.sessionToken + getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + if err != nil { + log.Error("failed to get neofs connection artifacts", zap.Error(err)) + c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) + return + } getOpts.ObjectAddress = address getOpts.Writer = nil - a.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(getOpts) } diff --git a/neofs/client-plant.go b/neofs/client-plant.go index 1430619..e89fd25 100644 --- a/neofs/client-plant.go +++ b/neofs/client-plant.go @@ -5,7 +5,6 @@ import ( "context" "crypto/ecdsa" "io" - "math" "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" @@ -15,7 +14,6 @@ import ( "github.com/nspcc-dev/neofs-http-gate/connections" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" - "github.com/pkg/errors" ) const maxObjectSize = uint64(1 << 28) // Limit objects to 256 MiB. @@ -62,7 +60,7 @@ type ObjectClient interface { } type ClientPlant interface { - GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) + ConnectionArtifacts() (client.Client, *token.SessionToken, error) Object() ObjectClient OwnerID() *owner.ID } @@ -78,16 +76,8 @@ type neofsClientPlant struct { pool connections.Pool } -func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) { - c := cp.pool.Client() - if c == nil { - return nil, nil, errors.New("failed to peek a healthy node to connect to") - } - st, err := c.CreateSession(ctx, math.MaxUint64) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token") - } - return c, st, nil +func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { + return cp.pool.ConnectionArtifacts() } func (cc *neofsClientPlant) Object() ObjectClient { diff --git a/uploader/upload.go b/uploader/upload.go index 158939c..2b10996 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -56,7 +56,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { return } defer func() { - // if temporary reader can be closed - close it + // If the temporary reader can be closed - let's close it. if file == nil { return } @@ -106,10 +106,10 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { putOpts := putOptionsPool.Get().(*neofs.PutOptions) defer putOptionsPool.Put(putOpts) // Try to put file into NeoFS or throw an error. - putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c) + putOpts.Client, putOpts.SessionToken, err = u.plant.ConnectionArtifacts() if err != nil { - log.Error("failed to get neofs client's reusable artifacts", zap.Error(err)) - c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError) + log.Error("failed to get neofs connection artifacts", zap.Error(err)) + c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } putOpts.BearerToken = bt @@ -118,8 +118,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { putOpts.PrepareObjectOnsite = false putOpts.Reader = file if addr, err = u.plant.Object().Put(c, putOpts); err != nil { - log.Error("could not store file in NeoFS", zap.Error(err)) - c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) + log.Error("could not store file in neofs", zap.Error(err)) + c.Error("could not store file in neofs", fasthttp.StatusBadRequest) return } // Try to return the response, otherwise, if something went wrong, throw an error.