forked from TrueCloudLab/frostfs-http-gw
Merge pull request #48 from masterSplinter01/feature/46-replace-pool-library-usage
Replace library usage
This commit is contained in:
commit
8777d9695c
13 changed files with 73 additions and 812 deletions
10
app.go
10
app.go
|
@ -6,11 +6,11 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/fasthttp/router"
|
"github.com/fasthttp/router"
|
||||||
"github.com/nspcc-dev/neofs-http-gw/connections"
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/downloader"
|
"github.com/nspcc-dev/neofs-http-gw/downloader"
|
||||||
"github.com/nspcc-dev/neofs-http-gw/logger"
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/neofs"
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/uploader"
|
"github.com/nspcc-dev/neofs-http-gw/uploader"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/logger"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/neofs"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/pool"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -99,7 +99,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal("failed to get neofs credentials", zap.Error(err))
|
a.log.Fatal("failed to get neofs credentials", zap.Error(err))
|
||||||
}
|
}
|
||||||
pb := new(connections.PoolBuilder)
|
pb := new(pool.Builder)
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address")
|
address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address")
|
||||||
weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight")
|
weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight")
|
||||||
|
@ -112,7 +112,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
pb.AddNode(address, weight)
|
pb.AddNode(address, weight)
|
||||||
a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight))
|
a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight))
|
||||||
}
|
}
|
||||||
opts := &connections.PoolBuilderOptions{
|
opts := &pool.BuilderOptions{
|
||||||
Key: creds.PrivateKey(),
|
Key: creds.PrivateKey(),
|
||||||
NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout),
|
NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout),
|
||||||
NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout),
|
NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout),
|
||||||
|
|
|
@ -1,160 +0,0 @@
|
||||||
package connections
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/keepalive"
|
|
||||||
)
|
|
||||||
|
|
||||||
// PoolBuilderOptions contains options used to build connection pool.
|
|
||||||
type PoolBuilderOptions struct {
|
|
||||||
Key *ecdsa.PrivateKey
|
|
||||||
NodeConnectionTimeout time.Duration
|
|
||||||
NodeRequestTimeout time.Duration
|
|
||||||
ClientRebalanceInterval time.Duration
|
|
||||||
KeepaliveTime time.Duration
|
|
||||||
KeepaliveTimeout time.Duration
|
|
||||||
KeepalivePermitWoStream bool
|
|
||||||
SessionExpirationEpoch uint64
|
|
||||||
weights []float64
|
|
||||||
connections []*grpc.ClientConn
|
|
||||||
}
|
|
||||||
|
|
||||||
// PoolBuilder is an interim structure used to collect node addresses/weights and
|
|
||||||
// build connection pool subsequently.
|
|
||||||
type PoolBuilder struct {
|
|
||||||
addresses []string
|
|
||||||
weights []float64
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddNode adds address/weight pair to node PoolBuilder list.
|
|
||||||
func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder {
|
|
||||||
pb.addresses = append(pb.addresses, address)
|
|
||||||
pb.weights = append(pb.weights, weight)
|
|
||||||
return pb
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build creates new pool based on current PoolBuilder state and options.
|
|
||||||
func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
|
|
||||||
if len(pb.addresses) == 0 {
|
|
||||||
return nil, errors.New("no NeoFS peers configured")
|
|
||||||
}
|
|
||||||
totalWeight := 0.0
|
|
||||||
for _, w := range pb.weights {
|
|
||||||
totalWeight += w
|
|
||||||
}
|
|
||||||
for i, w := range pb.weights {
|
|
||||||
pb.weights[i] = w / totalWeight
|
|
||||||
}
|
|
||||||
var cons = make([]*grpc.ClientConn, len(pb.addresses))
|
|
||||||
for i, address := range pb.addresses {
|
|
||||||
con, err := func() (*grpc.ClientConn, error) {
|
|
||||||
toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout)
|
|
||||||
defer c()
|
|
||||||
return grpc.DialContext(toctx, address,
|
|
||||||
grpc.WithInsecure(),
|
|
||||||
grpc.WithBlock(),
|
|
||||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
||||||
Time: options.KeepaliveTime,
|
|
||||||
Timeout: options.KeepaliveTimeout,
|
|
||||||
PermitWithoutStream: options.KeepalivePermitWoStream,
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cons[i] = con
|
|
||||||
}
|
|
||||||
options.weights = pb.weights
|
|
||||||
options.connections = cons
|
|
||||||
return new(ctx, options)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pool is an interface providing connection artifacts on request.
|
|
||||||
type Pool interface {
|
|
||||||
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientPack struct {
|
|
||||||
client client.Client
|
|
||||||
sessionToken *token.SessionToken
|
|
||||||
healthy bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type pool struct {
|
|
||||||
lock sync.RWMutex
|
|
||||||
sampler *Sampler
|
|
||||||
clientPacks []*clientPack
|
|
||||||
}
|
|
||||||
|
|
||||||
func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
|
|
||||||
clientPacks := make([]*clientPack, len(options.weights))
|
|
||||||
for i, con := range options.connections {
|
|
||||||
c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
|
|
||||||
if err != nil {
|
|
||||||
address := "unknown"
|
|
||||||
if epi, err := c.EndpointInfo(ctx); err == nil {
|
|
||||||
address = epi.NodeInfo().Address()
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("failed to create neofs session token for client %s: %w", address, err)
|
|
||||||
}
|
|
||||||
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true}
|
|
||||||
}
|
|
||||||
source := rand.NewSource(time.Now().UnixNano())
|
|
||||||
sampler := NewSampler(options.weights, source)
|
|
||||||
pool := &pool{sampler: sampler, clientPacks: clientPacks}
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTimer(options.ClientRebalanceInterval)
|
|
||||||
for range ticker.C {
|
|
||||||
ok := true
|
|
||||||
for i, clientPack := range pool.clientPacks {
|
|
||||||
func() {
|
|
||||||
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
|
|
||||||
defer c()
|
|
||||||
if _, err := clientPack.client.EndpointInfo(tctx); err != nil {
|
|
||||||
ok = false
|
|
||||||
}
|
|
||||||
pool.lock.Lock()
|
|
||||||
pool.clientPacks[i].healthy = ok
|
|
||||||
pool.lock.Unlock()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
ticker.Reset(options.ClientRebalanceInterval)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return pool, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
|
|
||||||
p.lock.RLock()
|
|
||||||
defer p.lock.RUnlock()
|
|
||||||
if len(p.clientPacks) == 1 {
|
|
||||||
cp := p.clientPacks[0]
|
|
||||||
if cp.healthy {
|
|
||||||
return cp.client, cp.sessionToken, nil
|
|
||||||
}
|
|
||||||
return nil, nil, errors.New("no healthy client")
|
|
||||||
}
|
|
||||||
attempts := 3 * len(p.clientPacks)
|
|
||||||
for k := 0; k < attempts; k++ {
|
|
||||||
i := p.sampler.Next()
|
|
||||||
if cp := p.clientPacks[i]; cp.healthy {
|
|
||||||
return cp.client, cp.sessionToken, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, nil, errors.New("no healthy client")
|
|
||||||
}
|
|
|
@ -1,81 +0,0 @@
|
||||||
package connections
|
|
||||||
|
|
||||||
import "math/rand"
|
|
||||||
|
|
||||||
// Sampler implements weighted random number generation using Vose's Alias
|
|
||||||
// Method (https://www.keithschwarz.com/darts-dice-coins/).
|
|
||||||
type Sampler struct {
|
|
||||||
randomGenerator *rand.Rand
|
|
||||||
probabilities []float64
|
|
||||||
alias []int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewSampler creates new Sampler with a given set of probabilities using
|
|
||||||
// given source of randomness. Created Sampler will produce numbers from
|
|
||||||
// 0 to len(probabilities).
|
|
||||||
func NewSampler(probabilities []float64, source rand.Source) *Sampler {
|
|
||||||
sampler := &Sampler{}
|
|
||||||
var (
|
|
||||||
small workList
|
|
||||||
large workList
|
|
||||||
)
|
|
||||||
n := len(probabilities)
|
|
||||||
sampler.randomGenerator = rand.New(source)
|
|
||||||
sampler.probabilities = make([]float64, n)
|
|
||||||
sampler.alias = make([]int, n)
|
|
||||||
// Compute scaled probabilities.
|
|
||||||
p := make([]float64, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
p[i] = probabilities[i] * float64(n)
|
|
||||||
}
|
|
||||||
for i, pi := range p {
|
|
||||||
if pi < 1 {
|
|
||||||
small.add(i)
|
|
||||||
} else {
|
|
||||||
large.add(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for len(small) > 0 && len(large) > 0 {
|
|
||||||
l, g := small.remove(), large.remove()
|
|
||||||
sampler.probabilities[l] = p[l]
|
|
||||||
sampler.alias[l] = g
|
|
||||||
p[g] = p[g] + p[l] - 1
|
|
||||||
if p[g] < 1 {
|
|
||||||
small.add(g)
|
|
||||||
} else {
|
|
||||||
large.add(g)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for len(large) > 0 {
|
|
||||||
g := large.remove()
|
|
||||||
sampler.probabilities[g] = 1
|
|
||||||
}
|
|
||||||
for len(small) > 0 {
|
|
||||||
l := small.remove()
|
|
||||||
sampler.probabilities[l] = 1
|
|
||||||
}
|
|
||||||
return sampler
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next returns the next (not so) random number from Sampler.
|
|
||||||
func (g *Sampler) Next() int {
|
|
||||||
n := len(g.alias)
|
|
||||||
i := g.randomGenerator.Intn(n)
|
|
||||||
if g.randomGenerator.Float64() < g.probabilities[i] {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
return g.alias[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
type workList []int
|
|
||||||
|
|
||||||
func (wl *workList) add(e int) {
|
|
||||||
*wl = append(*wl, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wl *workList) remove() int {
|
|
||||||
l := len(*wl) - 1
|
|
||||||
n := (*wl)[l]
|
|
||||||
*wl = (*wl)[:l]
|
|
||||||
return n
|
|
||||||
}
|
|
|
@ -12,30 +12,18 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-http-gw/neofs"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/neofs"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
getOptionsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return new(neofs.GetOptions)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
searchOptionsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return new(neofs.SearchOptions)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
type (
|
||||||
detector struct {
|
detector struct {
|
||||||
io.Writer
|
io.Writer
|
||||||
|
@ -45,8 +33,7 @@ type (
|
||||||
|
|
||||||
request struct {
|
request struct {
|
||||||
*fasthttp.RequestCtx
|
*fasthttp.RequestCtx
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
objectClient neofs.ObjectClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
objectIDs []*object.ID
|
objectIDs []*object.ID
|
||||||
|
@ -85,12 +72,15 @@ func isValidValue(s string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) receiveFile(options *neofs.GetOptions) {
|
func (r *request) receiveFile(clnt client.Client,
|
||||||
|
sessionToken *token.SessionToken,
|
||||||
|
objectAddress *object.Address) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
dis = "inline"
|
dis = "inline"
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
filename string
|
filename string
|
||||||
|
obj *object.Object
|
||||||
)
|
)
|
||||||
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
|
@ -98,8 +88,15 @@ func (r *request) receiveFile(options *neofs.GetOptions) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writer := newDetector(r.Response.BodyWriter())
|
writer := newDetector(r.Response.BodyWriter())
|
||||||
options.Writer = writer
|
options := new(client.GetObjectParams).
|
||||||
obj, err := r.objectClient.Get(r.RequestCtx, options)
|
WithAddress(objectAddress).
|
||||||
|
WithPayloadWriter(writer)
|
||||||
|
|
||||||
|
obj, err = clnt.GetObject(
|
||||||
|
r.RequestCtx,
|
||||||
|
options,
|
||||||
|
client.WithSession(sessionToken),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Error(
|
r.log.Error(
|
||||||
"could not receive object",
|
"could not receive object",
|
||||||
|
@ -183,9 +180,8 @@ func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downlo
|
||||||
|
|
||||||
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||||
return &request{
|
return &request{
|
||||||
RequestCtx: ctx,
|
RequestCtx: ctx,
|
||||||
log: log,
|
log: log,
|
||||||
objectClient: d.plant.Object(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,23 +194,22 @@ func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
|
||||||
oid, _ = c.UserValue("oid").(string)
|
oid, _ = c.UserValue("oid").(string)
|
||||||
val = strings.Join([]string{cid, oid}, "/")
|
val = strings.Join([]string{cid, oid}, "/")
|
||||||
log = d.log.With(zap.String("cid", cid), zap.String("oid", oid))
|
log = d.log.With(zap.String("cid", cid), zap.String("oid", oid))
|
||||||
|
conn client.Client
|
||||||
|
tkn *token.SessionToken
|
||||||
)
|
)
|
||||||
if err = address.Parse(val); err != nil {
|
if err = address.Parse(val); err != nil {
|
||||||
log.Error("wrong object address", zap.Error(err))
|
log.Error("wrong object address", zap.Error(err))
|
||||||
c.Error("wrong object address", fasthttp.StatusBadRequest)
|
c.Error("wrong object address", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
getOpts := getOptionsPool.Get().(*neofs.GetOptions)
|
|
||||||
defer getOptionsPool.Put(getOpts)
|
conn, tkn, err = d.plant.ConnectionArtifacts()
|
||||||
getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
||||||
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
getOpts.ObjectAddress = address
|
d.newRequest(c, log).receiveFile(conn, tkn, address)
|
||||||
getOpts.Writer = nil
|
|
||||||
d.newRequest(c, log).receiveFile(getOpts)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
|
@ -225,6 +220,9 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
key, _ = c.UserValue("attr_key").(string)
|
key, _ = c.UserValue("attr_key").(string)
|
||||||
val, _ = c.UserValue("attr_val").(string)
|
val, _ = c.UserValue("attr_val").(string)
|
||||||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
|
ids []*object.ID
|
||||||
|
conn client.Client
|
||||||
|
tkn *token.SessionToken
|
||||||
)
|
)
|
||||||
cid := container.NewID()
|
cid := container.NewID()
|
||||||
if err = cid.Parse(scid); err != nil {
|
if err = cid.Parse(scid); err != nil {
|
||||||
|
@ -232,20 +230,20 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions)
|
|
||||||
defer searchOptionsPool.Put(searchOpts)
|
conn, tkn, err = d.plant.ConnectionArtifacts()
|
||||||
searchOpts.Client, searchOpts.SessionToken, err = d.plant.ConnectionArtifacts()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
||||||
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
searchOpts.BearerToken = nil
|
|
||||||
searchOpts.ContainerID = cid
|
options := object.NewSearchFilters()
|
||||||
searchOpts.Attribute.Key = key
|
options.AddRootFilter()
|
||||||
searchOpts.Attribute.Value = val
|
options.AddFilter(key, val, object.MatchStringEqual)
|
||||||
var ids []*object.ID
|
|
||||||
if ids, err = d.plant.Object().Search(c, searchOpts); err != nil {
|
sops := new(client.SearchObjectParams).WithContainerID(cid).WithSearchFilters(options)
|
||||||
|
if ids, err = conn.SearchObject(c, sops, client.WithSession(tkn)); err != nil {
|
||||||
log.Error("something went wrong", zap.Error(err))
|
log.Error("something went wrong", zap.Error(err))
|
||||||
c.Error("something went wrong", fasthttp.StatusBadRequest)
|
c.Error("something went wrong", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
|
@ -262,15 +260,12 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
address := object.NewAddress()
|
address := object.NewAddress()
|
||||||
address.SetContainerID(cid)
|
address.SetContainerID(cid)
|
||||||
address.SetObjectID(ids[0])
|
address.SetObjectID(ids[0])
|
||||||
getOpts := getOptionsPool.Get().(*neofs.GetOptions)
|
|
||||||
defer getOptionsPool.Put(getOpts)
|
conn, tkn, err = d.plant.ConnectionArtifacts()
|
||||||
getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
||||||
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
getOpts.ObjectAddress = address
|
d.newRequest(c, log).receiveFile(conn, tkn, address)
|
||||||
getOpts.Writer = nil
|
|
||||||
d.newRequest(c, log).receiveFile(getOpts)
|
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/fasthttp/router v1.3.5
|
github.com/fasthttp/router v1.3.5
|
||||||
github.com/mr-tron/base58 v1.1.3 // indirect
|
github.com/mr-tron/base58 v1.1.3 // indirect
|
||||||
github.com/nspcc-dev/neofs-api-go v1.26.1
|
github.com/nspcc-dev/neofs-api-go v1.26.1
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0
|
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2
|
||||||
github.com/prometheus/client_golang v1.9.0
|
github.com/prometheus/client_golang v1.9.0
|
||||||
github.com/prometheus/common v0.15.0
|
github.com/prometheus/common v0.15.0
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -29,6 +29,8 @@ github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzg
|
||||||
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530=
|
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530=
|
||||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
||||||
|
github.com/alecthomas/participle v0.7.1/go.mod h1:HfdmEuwvr12HXQN44HPWXR0lHmVolVYe4dyL6lQ3duY=
|
||||||
|
github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ=
|
||||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||||
|
@ -317,6 +319,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9K
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM=
|
||||||
github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw=
|
||||||
|
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2 h1:z8xtKILKi+Dolk3VAyCaFPMroFnT+x8qTqMT/zBRqIc=
|
||||||
|
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2/go.mod h1:QZE7VaNQRyNFS+3gsrNEQEiLe+d6AR6EteX1M9geh6A=
|
||||||
github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
|
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
||||||
|
|
|
@ -1,78 +0,0 @@
|
||||||
package logger
|
|
||||||
|
|
||||||
import (
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
"google.golang.org/grpc/grpclog"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
zapLogger struct {
|
|
||||||
zapcore.Core
|
|
||||||
log *zap.SugaredLogger
|
|
||||||
}
|
|
||||||
|
|
||||||
// Logger includes grpclog.LoggerV2 interface with an additional
|
|
||||||
// Println method.
|
|
||||||
Logger interface {
|
|
||||||
grpclog.LoggerV2
|
|
||||||
Println(v ...interface{})
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// GRPC wraps given zap.Logger into grpclog.LoggerV2+ interface.
|
|
||||||
func GRPC(l *zap.Logger) Logger {
|
|
||||||
log := l.WithOptions(
|
|
||||||
// skip gRPCLog + zapLogger in caller
|
|
||||||
zap.AddCallerSkip(2))
|
|
||||||
|
|
||||||
return &zapLogger{
|
|
||||||
Core: log.Core(),
|
|
||||||
log: log.Sugar(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Info implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) }
|
|
||||||
|
|
||||||
// Infoln implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) }
|
|
||||||
|
|
||||||
// Infof implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) }
|
|
||||||
|
|
||||||
// Println allows to print a line with info severity.
|
|
||||||
func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) }
|
|
||||||
|
|
||||||
// Printf implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) }
|
|
||||||
|
|
||||||
// Warning implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) }
|
|
||||||
|
|
||||||
// Warningln implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) }
|
|
||||||
|
|
||||||
// Warningf implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) }
|
|
||||||
|
|
||||||
// Error implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) }
|
|
||||||
|
|
||||||
// Errorln implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) }
|
|
||||||
|
|
||||||
// Errorf implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) }
|
|
||||||
|
|
||||||
// Fatal implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) }
|
|
||||||
|
|
||||||
// Fatalln implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) }
|
|
||||||
|
|
||||||
// Fatalf implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.log.Fatalf(format, args...) }
|
|
||||||
|
|
||||||
// V implements grpclog.LoggerV2.
|
|
||||||
func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) }
|
|
|
@ -1,33 +0,0 @@
|
||||||
package logger
|
|
||||||
|
|
||||||
import "go.uber.org/zap"
|
|
||||||
|
|
||||||
// WithSamplingInitial returns Option that sets sampling initial parameter.
|
|
||||||
func WithSamplingInitial(v int) Option { return func(o *options) { o.SamplingInitial = v } }
|
|
||||||
|
|
||||||
// WithSamplingThereafter returns Option that sets sampling thereafter parameter.
|
|
||||||
func WithSamplingThereafter(v int) Option { return func(o *options) { o.SamplingThereafter = v } }
|
|
||||||
|
|
||||||
// WithFormat returns Option that sets format parameter.
|
|
||||||
func WithFormat(v string) Option { return func(o *options) { o.Format = v } }
|
|
||||||
|
|
||||||
// WithLevel returns Option that sets Level parameter.
|
|
||||||
func WithLevel(v string) Option { return func(o *options) { o.Level = v } }
|
|
||||||
|
|
||||||
// WithTraceLevel returns Option that sets trace level parameter.
|
|
||||||
func WithTraceLevel(v string) Option { return func(o *options) { o.TraceLevel = v } }
|
|
||||||
|
|
||||||
// WithoutDisclaimer returns Option that disables disclaimer.
|
|
||||||
func WithoutDisclaimer() Option { return func(o *options) { o.NoDisclaimer = true } }
|
|
||||||
|
|
||||||
// WithoutCaller returns Option that disables caller printing.
|
|
||||||
func WithoutCaller() Option { return func(o *options) { o.NoCaller = true } }
|
|
||||||
|
|
||||||
// WithAppName returns Option that sets application name.
|
|
||||||
func WithAppName(v string) Option { return func(o *options) { o.AppName = v } }
|
|
||||||
|
|
||||||
// WithAppVersion returns Option that sets application version.
|
|
||||||
func WithAppVersion(v string) Option { return func(o *options) { o.AppVersion = v } }
|
|
||||||
|
|
||||||
// WithZapOptions returns Option that sets zap logger options.
|
|
||||||
func WithZapOptions(opts ...zap.Option) Option { return func(o *options) { o.Options = opts } }
|
|
134
logger/zap.go
134
logger/zap.go
|
@ -1,134 +0,0 @@
|
||||||
package logger
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
// Option represents logger option setter.
|
|
||||||
Option func(o *options)
|
|
||||||
|
|
||||||
options struct {
|
|
||||||
Options []zap.Option
|
|
||||||
|
|
||||||
SamplingInitial int
|
|
||||||
SamplingThereafter int
|
|
||||||
|
|
||||||
Format string
|
|
||||||
Level string
|
|
||||||
TraceLevel string
|
|
||||||
|
|
||||||
NoCaller bool
|
|
||||||
NoDisclaimer bool
|
|
||||||
|
|
||||||
AppName string
|
|
||||||
AppVersion string
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
formatJSON = "json"
|
|
||||||
formatConsole = "console"
|
|
||||||
|
|
||||||
defaultSamplingInitial = 100
|
|
||||||
defaultSamplingThereafter = 100
|
|
||||||
|
|
||||||
lvlInfo = "info"
|
|
||||||
lvlWarn = "warn"
|
|
||||||
lvlDebug = "debug"
|
|
||||||
lvlError = "error"
|
|
||||||
lvlFatal = "fatal"
|
|
||||||
lvlPanic = "panic"
|
|
||||||
)
|
|
||||||
|
|
||||||
func safeLevel(lvl string) zap.AtomicLevel {
|
|
||||||
switch strings.ToLower(lvl) {
|
|
||||||
case lvlDebug:
|
|
||||||
return zap.NewAtomicLevelAt(zap.DebugLevel)
|
|
||||||
case lvlWarn:
|
|
||||||
return zap.NewAtomicLevelAt(zap.WarnLevel)
|
|
||||||
case lvlError:
|
|
||||||
return zap.NewAtomicLevelAt(zap.ErrorLevel)
|
|
||||||
case lvlFatal:
|
|
||||||
return zap.NewAtomicLevelAt(zap.FatalLevel)
|
|
||||||
case lvlPanic:
|
|
||||||
return zap.NewAtomicLevelAt(zap.PanicLevel)
|
|
||||||
default:
|
|
||||||
return zap.NewAtomicLevelAt(zap.InfoLevel)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaults() *options {
|
|
||||||
return &options{
|
|
||||||
SamplingInitial: defaultSamplingInitial,
|
|
||||||
SamplingThereafter: defaultSamplingThereafter,
|
|
||||||
|
|
||||||
Format: formatConsole,
|
|
||||||
Level: lvlDebug,
|
|
||||||
TraceLevel: lvlInfo,
|
|
||||||
|
|
||||||
NoCaller: false,
|
|
||||||
NoDisclaimer: false,
|
|
||||||
|
|
||||||
AppName: "",
|
|
||||||
AppVersion: "",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns new zap.Logger using all options specified and stdout used
|
|
||||||
// for output.
|
|
||||||
func New(opts ...Option) (*zap.Logger, error) {
|
|
||||||
o := defaults()
|
|
||||||
c := zap.NewProductionConfig()
|
|
||||||
|
|
||||||
c.OutputPaths = []string{"stdout"}
|
|
||||||
c.ErrorOutputPaths = []string{"stdout"}
|
|
||||||
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(o)
|
|
||||||
}
|
|
||||||
|
|
||||||
// set sampling
|
|
||||||
c.Sampling = &zap.SamplingConfig{
|
|
||||||
Initial: o.SamplingInitial,
|
|
||||||
Thereafter: o.SamplingThereafter,
|
|
||||||
}
|
|
||||||
|
|
||||||
// logger level
|
|
||||||
c.Level = safeLevel(o.Level)
|
|
||||||
traceLvl := safeLevel(o.TraceLevel)
|
|
||||||
|
|
||||||
// logger format
|
|
||||||
switch f := o.Format; strings.ToLower(f) {
|
|
||||||
case formatConsole:
|
|
||||||
c.Encoding = formatConsole
|
|
||||||
default:
|
|
||||||
c.Encoding = formatJSON
|
|
||||||
}
|
|
||||||
|
|
||||||
// logger time
|
|
||||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
||||||
|
|
||||||
if o.NoCaller {
|
|
||||||
c.EncoderConfig.EncodeCaller = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// enable trace only for current log-level
|
|
||||||
o.Options = append(o.Options, zap.AddStacktrace(traceLvl))
|
|
||||||
|
|
||||||
l, err := c.Build(o.Options...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if o.NoDisclaimer {
|
|
||||||
return l, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return l.With(
|
|
||||||
zap.String("app_name", o.AppName),
|
|
||||||
zap.String("app_version", o.AppVersion)), nil
|
|
||||||
}
|
|
2
main.go
2
main.go
|
@ -5,7 +5,7 @@ import (
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/logger"
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/logger"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,177 +0,0 @@
|
||||||
package neofs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/connections"
|
|
||||||
)
|
|
||||||
|
|
||||||
// BaseOptions represents basic NeoFS request options.
|
|
||||||
type BaseOptions struct {
|
|
||||||
Client client.Client
|
|
||||||
SessionToken *token.SessionToken
|
|
||||||
BearerToken *token.BearerToken
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutOptions represents NeoFS Put request options.
|
|
||||||
type PutOptions struct {
|
|
||||||
BaseOptions
|
|
||||||
Attributes []*object.Attribute
|
|
||||||
ContainerID *container.ID
|
|
||||||
OwnerID *owner.ID
|
|
||||||
Reader io.Reader
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetOptions represents NeoFS Get request options.
|
|
||||||
type GetOptions struct {
|
|
||||||
BaseOptions
|
|
||||||
ObjectAddress *object.Address
|
|
||||||
Writer io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// SearchOptions represents NeoFS Search request options.
|
|
||||||
type SearchOptions struct {
|
|
||||||
BaseOptions
|
|
||||||
ContainerID *container.ID
|
|
||||||
Attribute struct {
|
|
||||||
Key string
|
|
||||||
Value string
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteOptions represents NeoFS Delete request options.
|
|
||||||
type DeleteOptions struct {
|
|
||||||
BaseOptions
|
|
||||||
ObjectAddress *object.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
// ObjectClient wraps basic NeoFS requests.
|
|
||||||
type ObjectClient interface {
|
|
||||||
Put(context.Context, *PutOptions) (*object.Address, error)
|
|
||||||
Get(context.Context, *GetOptions) (*object.Object, error)
|
|
||||||
Search(context.Context, *SearchOptions) ([]*object.ID, error)
|
|
||||||
Delete(context.Context, *DeleteOptions) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientPlant provides connections to NeoFS nodes from pool and allows to
|
|
||||||
// get local owner ID.
|
|
||||||
type ClientPlant interface {
|
|
||||||
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
|
|
||||||
Object() ObjectClient
|
|
||||||
OwnerID() *owner.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
type neofsObjectClient struct {
|
|
||||||
key *ecdsa.PrivateKey
|
|
||||||
pool connections.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
type neofsClientPlant struct {
|
|
||||||
key *ecdsa.PrivateKey
|
|
||||||
ownerID *owner.ID
|
|
||||||
pool connections.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
// ConnectionArtifacts returns connection from pool.
|
|
||||||
func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
|
|
||||||
return cp.pool.ConnectionArtifacts()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Object returns ObjectClient instance from plant.
|
|
||||||
func (cp *neofsClientPlant) Object() ObjectClient {
|
|
||||||
return &neofsObjectClient{
|
|
||||||
key: cp.key,
|
|
||||||
pool: cp.pool,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OwnerID returns plant's owner ID.
|
|
||||||
func (cp *neofsClientPlant) OwnerID() *owner.ID {
|
|
||||||
return cp.ownerID
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewClientPlant creates new ClientPlant from given context, pool and credentials.
|
|
||||||
func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) {
|
|
||||||
return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put does NeoFS Put request, returning new object address if successful.
|
|
||||||
func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
objectID *object.ID
|
|
||||||
)
|
|
||||||
address := object.NewAddress()
|
|
||||||
rawObject := object.NewRaw()
|
|
||||||
rawObject.SetContainerID(options.ContainerID)
|
|
||||||
rawObject.SetOwnerID(options.OwnerID)
|
|
||||||
rawObject.SetAttributes(options.Attributes...)
|
|
||||||
ops := new(client.PutObjectParams).
|
|
||||||
WithObject(rawObject.Object()).
|
|
||||||
WithPayloadReader(options.Reader)
|
|
||||||
objectID, err = options.Client.PutObject(
|
|
||||||
ctx,
|
|
||||||
ops,
|
|
||||||
client.WithSession(options.SessionToken),
|
|
||||||
client.WithBearer(options.BearerToken),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
address.SetObjectID(objectID)
|
|
||||||
address.SetContainerID(options.ContainerID)
|
|
||||||
return address, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get does NeoFS Get request, returning an object received if successful.
|
|
||||||
func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
obj *object.Object
|
|
||||||
)
|
|
||||||
ops := new(client.GetObjectParams).
|
|
||||||
WithAddress(options.ObjectAddress).
|
|
||||||
WithPayloadWriter(options.Writer)
|
|
||||||
obj, err = options.Client.GetObject(
|
|
||||||
ctx,
|
|
||||||
ops,
|
|
||||||
client.WithSession(options.SessionToken),
|
|
||||||
client.WithBearer(options.BearerToken),
|
|
||||||
)
|
|
||||||
return obj, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Search does NeoFS Search request, returning object IDs if successful.
|
|
||||||
func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) {
|
|
||||||
sfs := object.NewSearchFilters()
|
|
||||||
sfs.AddRootFilter()
|
|
||||||
sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual)
|
|
||||||
sops := new(client.SearchObjectParams)
|
|
||||||
sops.WithContainerID(options.ContainerID)
|
|
||||||
sops.WithSearchFilters(sfs)
|
|
||||||
return options.Client.SearchObject(
|
|
||||||
ctx,
|
|
||||||
sops,
|
|
||||||
client.WithSession(options.SessionToken),
|
|
||||||
client.WithBearer(options.BearerToken),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete deletes NeoFS object.
|
|
||||||
func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error {
|
|
||||||
ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress)
|
|
||||||
err := options.Client.DeleteObject(
|
|
||||||
ctx,
|
|
||||||
ops,
|
|
||||||
client.WithSession(options.SessionToken),
|
|
||||||
client.WithBearer(options.BearerToken),
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
|
@ -1,78 +0,0 @@
|
||||||
package neofs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"crypto/elliptic"
|
|
||||||
"crypto/rand"
|
|
||||||
"math/big"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
||||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
// Credentials contains methods that needed to work with NeoFS.
|
|
||||||
Credentials interface {
|
|
||||||
Owner() *owner.ID
|
|
||||||
PublicKey() *ecdsa.PublicKey
|
|
||||||
PrivateKey() *ecdsa.PrivateKey
|
|
||||||
}
|
|
||||||
|
|
||||||
credentials struct {
|
|
||||||
key *ecdsa.PrivateKey
|
|
||||||
ownerID *owner.ID
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewCredentials creates an instance of Credentials through string
|
|
||||||
// representation of secret. It allows passing WIF, path, hex-encoded and others.
|
|
||||||
func NewCredentials(secret string) (Credentials, error) {
|
|
||||||
key, err := crypto.LoadPrivateKey(secret)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return setFromPrivateKey(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEphemeralCredentials creates new private key and Credentials based on that
|
|
||||||
// key.
|
|
||||||
func NewEphemeralCredentials() (Credentials, error) {
|
|
||||||
c := elliptic.P256()
|
|
||||||
priv, x, y, err := elliptic.GenerateKey(c, rand.Reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
key := &ecdsa.PrivateKey{
|
|
||||||
PublicKey: ecdsa.PublicKey{
|
|
||||||
Curve: c,
|
|
||||||
X: x,
|
|
||||||
Y: y,
|
|
||||||
},
|
|
||||||
D: new(big.Int).SetBytes(priv),
|
|
||||||
}
|
|
||||||
return setFromPrivateKey(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrivateKey returns ecdsa.PrivateKey.
|
|
||||||
func (c *credentials) PrivateKey() *ecdsa.PrivateKey {
|
|
||||||
return c.key
|
|
||||||
}
|
|
||||||
|
|
||||||
// PublicKey returns ecdsa.PublicKey.
|
|
||||||
func (c *credentials) PublicKey() *ecdsa.PublicKey {
|
|
||||||
return &c.key.PublicKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// Owner returns owner.ID.
|
|
||||||
func (c *credentials) Owner() *owner.ID {
|
|
||||||
return c.ownerID
|
|
||||||
}
|
|
||||||
|
|
||||||
func setFromPrivateKey(key *ecdsa.PrivateKey) (*credentials, error) {
|
|
||||||
wallet, err := owner.NEO3WalletFromPublicKey(&key.PublicKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ownerID := owner.NewIDFromNeo3Wallet(wallet)
|
|
||||||
return &credentials{key: key, ownerID: ownerID}, nil
|
|
||||||
}
|
|
|
@ -5,15 +5,15 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-http-gw/neofs"
|
|
||||||
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/neofs"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -23,12 +23,6 @@ const (
|
||||||
drainBufSize = 4096
|
drainBufSize = 4096
|
||||||
)
|
)
|
||||||
|
|
||||||
var putOptionsPool = sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return new(neofs.PutOptions)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Uploader is an upload request handler.
|
// Uploader is an upload request handler.
|
||||||
type Uploader struct {
|
type Uploader struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
@ -47,7 +41,10 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
file MultipartFile
|
file MultipartFile
|
||||||
addr *object.Address
|
obj *object.ID
|
||||||
|
conn client.Client
|
||||||
|
tkn *token.SessionToken
|
||||||
|
addr = object.NewAddress()
|
||||||
cid = container.NewID()
|
cid = container.NewID()
|
||||||
scid, _ = c.UserValue("cid").(string)
|
scid, _ = c.UserValue("cid").(string)
|
||||||
log = u.log.With(zap.String("cid", scid))
|
log = u.log.With(zap.String("cid", scid))
|
||||||
|
@ -107,25 +104,31 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
attributes = append(attributes, timestamp)
|
attributes = append(attributes, timestamp)
|
||||||
}
|
}
|
||||||
oid, bt := u.fetchOwnerAndBearerToken(c)
|
oid, bt := u.fetchOwnerAndBearerToken(c)
|
||||||
putOpts := putOptionsPool.Get().(*neofs.PutOptions)
|
|
||||||
defer putOptionsPool.Put(putOpts)
|
|
||||||
// Try to put file into NeoFS or throw an error.
|
// Try to put file into NeoFS or throw an error.
|
||||||
putOpts.Client, putOpts.SessionToken, err = u.plant.ConnectionArtifacts()
|
conn, tkn, err = u.plant.ConnectionArtifacts()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
||||||
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
putOpts.Attributes = attributes
|
|
||||||
putOpts.BearerToken = bt
|
rawObject := object.NewRaw()
|
||||||
putOpts.ContainerID = cid
|
rawObject.SetContainerID(cid)
|
||||||
putOpts.OwnerID = oid
|
rawObject.SetOwnerID(oid)
|
||||||
putOpts.Reader = file
|
rawObject.SetAttributes(attributes...)
|
||||||
if addr, err = u.plant.Object().Put(c, putOpts); err != nil {
|
|
||||||
|
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(file)
|
||||||
|
|
||||||
|
if obj, err = conn.PutObject(c, ops, client.WithSession(tkn), client.WithBearer(bt)); err != nil {
|
||||||
log.Error("could not store file in neofs", zap.Error(err))
|
log.Error("could not store file in neofs", zap.Error(err))
|
||||||
c.Error("could not store file in neofs", fasthttp.StatusBadRequest)
|
c.Error("could not store file in neofs", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addr.SetObjectID(obj)
|
||||||
|
addr.SetContainerID(cid)
|
||||||
|
|
||||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||||
if err = newPutResponse(addr).encode(c); err != nil {
|
if err = newPutResponse(addr).encode(c); err != nil {
|
||||||
log.Error("could not prepare response", zap.Error(err))
|
log.Error("could not prepare response", zap.Error(err))
|
||||||
|
@ -151,8 +154,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
||||||
if token, err := tokens.LoadBearerToken(ctx); err == nil && token != nil {
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
||||||
return token.Issuer(), token
|
return tkn.Issuer(), tkn
|
||||||
}
|
}
|
||||||
return u.plant.OwnerID(), nil
|
return u.plant.OwnerID(), nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue