Put artifacts into pool solely

Signed-off-by: Pavel Korotkov <pavel@nspcc.ru>
This commit is contained in:
Pavel Korotkov 2021-04-07 15:54:30 +03:00 committed by Pavel Korotkov
parent d7617110b7
commit fad05b76d4
5 changed files with 78 additions and 70 deletions

2
app.go
View file

@ -2,6 +2,7 @@ package main
import (
"context"
"math"
"strconv"
"github.com/fasthttp/router"
@ -104,6 +105,7 @@ func newApp(ctx context.Context, opt ...Option) App {
NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout),
NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout),
ClientRebalanceInterval: a.cfg.GetDuration(cfgRebalance),
SessionExpirationEpoch: math.MaxUint64,
}
pool, err := pb.Build(ctx, opts)
if err != nil {

View file

@ -3,13 +3,14 @@ package connections
import (
"context"
"crypto/ecdsa"
"errors"
"math"
"math/rand"
"sync"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
@ -18,6 +19,7 @@ type PoolBuilderOptions struct {
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
SessionExpirationEpoch uint64
weights []float64
connections []*grpc.ClientConn
}
@ -59,47 +61,54 @@ func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (
}
type Pool interface {
Client() client.Client
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
}
type clientPack struct {
client client.Client
sessionToken *token.SessionToken
healthy bool
}
type pool struct {
lock sync.RWMutex
sampler *Sampler
clients []client.Client
healthy []bool
clientPacks []*clientPack
}
func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
n := len(options.weights)
clients := make([]client.Client, n)
healthy := make([]bool, n)
clientPacks := make([]*clientPack, len(options.weights))
for i, con := range options.connections {
c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con))
if err != nil {
return nil, err
}
clients[i] = c
healthy[i] = true
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
if err != nil {
address := "unknown"
if epi, err := c.EndpointInfo(ctx); err == nil {
address = epi.NodeInfo().Address()
}
return nil, errors.Wrapf(err, "failed to create neofs session token for client %s", address)
}
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true}
}
source := rand.NewSource(time.Now().UnixNano())
pool := &pool{
sampler: NewSampler(options.weights, source),
clients: clients,
healthy: healthy,
}
sampler := NewSampler(options.weights, source)
pool := &pool{sampler: sampler, clientPacks: clientPacks}
go func() {
ticker := time.NewTimer(options.ClientRebalanceInterval)
for range ticker.C {
ok := true
for i, client := range pool.clients {
for i, clientPack := range pool.clientPacks {
func() {
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
defer c()
if _, err := client.EndpointInfo(tctx); err != nil {
if _, err := clientPack.client.EndpointInfo(tctx); err != nil {
ok = false
}
pool.lock.Lock()
pool.healthy[i] = ok
pool.clientPacks[i].healthy = ok
pool.lock.Unlock()
}()
}
@ -109,24 +118,26 @@ func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
return pool, nil
}
func (p *pool) Client() client.Client {
func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if len(p.clients) == 1 {
if p.healthy[0] {
return p.clients[0]
if len(p.clientPacks) == 1 {
cp := p.clientPacks[0]
if cp.healthy {
return cp.client, cp.sessionToken, nil
}
return nil
return nil, nil, errors.New("no healthy client")
}
var i *int = nil
for k := 0; k < 10; k++ {
i_ := p.sampler.Next()
if p.healthy[i_] {
if p.clientPacks[i_].healthy {
i = &i_
}
}
if i != nil {
return p.clients[*i]
cp := p.clientPacks[*i]
return cp.client, cp.sessionToken, nil
}
return nil
return nil, nil, errors.New("no healthy client")
}

View file

@ -10,10 +10,8 @@ import (
"sync"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gate/neofs"
"github.com/nspcc-dev/neofs-http-gate/tokens"
"github.com/pkg/errors"
@ -140,38 +138,33 @@ func (o objectIDs) Slice() []string {
type Downloader struct {
log *zap.Logger
plant neofs.ClientPlant
getOperations struct {
client client.Client
sessionToken *token.SessionToken
}
}
func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downloader, error) {
var err error
d := &Downloader{log: log, plant: plant}
d.getOperations.client, d.getOperations.sessionToken, err = d.plant.GetReusableArtifacts(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get neofs client's reusable artifacts")
}
return d, nil
}
func (a *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{
RequestCtx: ctx,
log: log,
objectClient: a.plant.Object(),
objectClient: d.plant.Object(),
}
}
func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
var (
err error
address = object.NewAddress()
cid, _ = c.UserValue("cid").(string)
oid, _ = c.UserValue("oid").(string)
val = strings.Join([]string{cid, oid}, "/")
log = a.log.With(zap.String("cid", cid), zap.String("oid", oid))
log = d.log.With(zap.String("cid", cid), zap.String("oid", oid))
)
if err = address.Parse(val); err != nil {
log.Error("wrong object address", zap.Error(err))
@ -180,20 +173,24 @@ func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
}
getOpts := getOptionsPool.Get().(*neofs.GetOptions)
defer getOptionsPool.Put(getOpts)
getOpts.Client = a.getOperations.client
getOpts.SessionToken = a.getOperations.sessionToken
getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts()
if err != nil {
log.Error("failed to get neofs connection artifacts", zap.Error(err))
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
return
}
getOpts.ObjectAddress = address
getOpts.Writer = nil
a.newRequest(c, log).receiveFile(getOpts)
d.newRequest(c, log).receiveFile(getOpts)
}
func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
var (
err error
scid, _ = c.UserValue("cid").(string)
key, _ = c.UserValue("attr_key").(string)
val, _ = c.UserValue("attr_val").(string)
log = a.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
)
cid := container.NewID()
if err = cid.Parse(scid); err != nil {
@ -203,14 +200,18 @@ func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
}
searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions)
defer searchOptionsPool.Put(searchOpts)
searchOpts.Client = a.getOperations.client
searchOpts.SessionToken = a.getOperations.sessionToken
searchOpts.Client, searchOpts.SessionToken, err = d.plant.ConnectionArtifacts()
if err != nil {
log.Error("failed to get neofs connection artifacts", zap.Error(err))
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
return
}
searchOpts.BearerToken = nil
searchOpts.ContainerID = cid
searchOpts.Attribute.Key = key
searchOpts.Attribute.Value = val
var ids []*object.ID
if ids, err = a.plant.Object().Search(c, searchOpts); err != nil {
if ids, err = d.plant.Object().Search(c, searchOpts); err != nil {
log.Error("something went wrong", zap.Error(err))
c.Error("something went wrong", fasthttp.StatusBadRequest)
return
@ -229,9 +230,13 @@ func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
address.SetObjectID(ids[0])
getOpts := getOptionsPool.Get().(*neofs.GetOptions)
defer getOptionsPool.Put(getOpts)
getOpts.Client = a.getOperations.client
getOpts.SessionToken = a.getOperations.sessionToken
getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts()
if err != nil {
log.Error("failed to get neofs connection artifacts", zap.Error(err))
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
return
}
getOpts.ObjectAddress = address
getOpts.Writer = nil
a.newRequest(c, log).receiveFile(getOpts)
d.newRequest(c, log).receiveFile(getOpts)
}

View file

@ -5,7 +5,6 @@ import (
"context"
"crypto/ecdsa"
"io"
"math"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
@ -15,7 +14,6 @@ import (
"github.com/nspcc-dev/neofs-http-gate/connections"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
const maxObjectSize = uint64(1 << 28) // Limit objects to 256 MiB.
@ -62,7 +60,7 @@ type ObjectClient interface {
}
type ClientPlant interface {
GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error)
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
Object() ObjectClient
OwnerID() *owner.ID
}
@ -78,16 +76,8 @@ type neofsClientPlant struct {
pool connections.Pool
}
func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) {
c := cp.pool.Client()
if c == nil {
return nil, nil, errors.New("failed to peek a healthy node to connect to")
}
st, err := c.CreateSession(ctx, math.MaxUint64)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token")
}
return c, st, nil
func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
return cp.pool.ConnectionArtifacts()
}
func (cc *neofsClientPlant) Object() ObjectClient {

View file

@ -56,7 +56,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
return
}
defer func() {
// if temporary reader can be closed - close it
// If the temporary reader can be closed - let's close it.
if file == nil {
return
}
@ -106,10 +106,10 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
putOpts := putOptionsPool.Get().(*neofs.PutOptions)
defer putOptionsPool.Put(putOpts)
// Try to put file into NeoFS or throw an error.
putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c)
putOpts.Client, putOpts.SessionToken, err = u.plant.ConnectionArtifacts()
if err != nil {
log.Error("failed to get neofs client's reusable artifacts", zap.Error(err))
c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError)
log.Error("failed to get neofs connection artifacts", zap.Error(err))
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
return
}
putOpts.BearerToken = bt
@ -118,8 +118,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
putOpts.PrepareObjectOnsite = false
putOpts.Reader = file
if addr, err = u.plant.Object().Put(c, putOpts); err != nil {
log.Error("could not store file in NeoFS", zap.Error(err))
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
log.Error("could not store file in neofs", zap.Error(err))
c.Error("could not store file in neofs", fasthttp.StatusBadRequest)
return
}
// Try to return the response, otherwise, if something went wrong, throw an error.