[#19] Extract downloading logic into a separate package

Signed-off-by: Pavel Korotkov <pavel@nspcc.ru>
This commit is contained in:
Pavel Korotkov 2021-03-31 21:24:41 +03:00 committed by Pavel Korotkov
parent eb92219e14
commit 3a5d9fe94c
3 changed files with 83 additions and 50 deletions

39
app.go
View file

@ -6,8 +6,7 @@ import (
"strconv" "strconv"
"github.com/fasthttp/router" "github.com/fasthttp/router"
"github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-http-gate/downloader"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gate/logger" "github.com/nspcc-dev/neofs-http-gate/logger"
"github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/neofs"
"github.com/nspcc-dev/neofs-http-gate/uploader" "github.com/nspcc-dev/neofs-http-gate/uploader"
@ -19,14 +18,10 @@ import (
type ( type (
app struct { app struct {
plant neofs.ClientPlant
getOperations struct {
client client.Client
sessionToken *token.SessionToken
}
log *zap.Logger log *zap.Logger
plant neofs.ClientPlant
cfg *viper.Viper cfg *viper.Viper
wlog logger.Logger auxiliaryLog logger.Logger
web *fasthttp.Server web *fasthttp.Server
jobDone chan struct{} jobDone chan struct{}
webDone chan struct{} webDone chan struct{}
@ -62,24 +57,20 @@ func WithConfig(c *viper.Viper) Option {
func newApp(ctx context.Context, opt ...Option) App { func newApp(ctx context.Context, opt ...Option) App {
a := &app{ a := &app{
log: zap.L(), log: zap.L(),
cfg: viper.GetViper(), cfg: viper.GetViper(),
web: new(fasthttp.Server), web: new(fasthttp.Server),
jobDone: make(chan struct{}), jobDone: make(chan struct{}),
webDone: make(chan struct{}), webDone: make(chan struct{}),
} }
for i := range opt { for i := range opt {
opt[i](a) opt[i](a)
} }
a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
a.auxiliaryLog = logger.GRPC(a.log)
a.wlog = logger.GRPC(a.log)
if a.cfg.GetBool(cmdVerbose) { if a.cfg.GetBool(cmdVerbose) {
grpclog.SetLoggerV2(a.wlog) grpclog.SetLoggerV2(a.auxiliaryLog)
} }
// conTimeout := a.cfg.GetDuration(cfgConTimeout) // conTimeout := a.cfg.GetDuration(cfgConTimeout)
@ -124,10 +115,6 @@ func newApp(ctx context.Context, opt ...Option) App {
if err != nil { if err != nil {
a.log.Fatal("failed to create neofs client") a.log.Fatal("failed to create neofs client")
} }
a.getOperations.client, a.getOperations.sessionToken, err = a.plant.GetReusableArtifacts(ctx)
if err != nil {
a.log.Fatal("failed to get neofs client's reusable artifacts")
}
return a return a
} }
@ -153,21 +140,25 @@ func (a *app) Serve(ctx context.Context) {
close(a.webDone) close(a.webDone)
}() }()
uploader := uploader.New(a.log, a.plant, a.enableDefaultTimestamp) uploader := uploader.New(a.log, a.plant, a.enableDefaultTimestamp)
downloader, err := downloader.New(ctx, a.log, a.plant)
if err != nil {
a.log.Fatal("failed to create downloader", zap.Error(err))
}
// Configure router. // Configure router.
r := router.New() r := router.New()
r.RedirectTrailingSlash = true r.RedirectTrailingSlash = true
r.POST("/upload/{cid}", uploader.Upload) r.POST("/upload/{cid}", uploader.Upload)
a.log.Info("added path /upload/{cid}") a.log.Info("added path /upload/{cid}")
r.GET("/get/{cid}/{oid}", a.byAddress) r.GET("/get/{cid}/{oid}", downloader.DownloadByAddress)
a.log.Info("added path /get/{cid}/{oid}") a.log.Info("added path /get/{cid}/{oid}")
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.byAttribute) r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", downloader.DownloadByAttribute)
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
// attaching /-/(ready,healthy) // attaching /-/(ready,healthy)
// attachHealthy(r, a.pool.Status) // attachHealthy(r, a.pool.Status)
// enable metrics // enable metrics
if a.cfg.GetBool(cmdMetrics) { if a.cfg.GetBool(cmdMetrics) {
a.log.Info("added path /metrics/") a.log.Info("added path /metrics/")
attachMetrics(r, a.wlog) attachMetrics(r, a.auxiliaryLog)
} }
// enable pprof // enable pprof
if a.cfg.GetBool(cmdPprof) { if a.cfg.GetBool(cmdPprof) {

View file

@ -1,6 +1,7 @@
package main package downloader
import ( import (
"context"
"io" "io"
"net/http" "net/http"
"path" "path"
@ -9,8 +10,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/neofs"
"github.com/nspcc-dev/neofs-http-gate/tokens" "github.com/nspcc-dev/neofs-http-gate/tokens"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -20,6 +23,20 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
var (
getOptionsPool = sync.Pool{
New: func() interface{} {
return new(neofs.GetOptions)
},
}
searchOptionsPool = sync.Pool{
New: func() interface{} {
return new(neofs.SearchOptions)
},
}
)
type ( type (
detector struct { detector struct {
io.Writer io.Writer
@ -29,8 +46,8 @@ type (
request struct { request struct {
*fasthttp.RequestCtx *fasthttp.RequestCtx
log *zap.Logger log *zap.Logger
obj neofs.ObjectClient objectClient neofs.ObjectClient
} }
objectIDs []*object.ID objectIDs []*object.ID
@ -61,7 +78,7 @@ func (r *request) receiveFile(options *neofs.GetOptions) {
} }
writer := newDetector(r.Response.BodyWriter()) writer := newDetector(r.Response.BodyWriter())
options.Writer = writer options.Writer = writer
obj, err := r.obj.Get(r.RequestCtx, options) obj, err := r.objectClient.Get(r.RequestCtx, options)
if err != nil { if err != nil {
r.log.Error( r.log.Error(
"could not receive object", "could not receive object",
@ -120,15 +137,34 @@ func (o objectIDs) Slice() []string {
return res return res
} }
func (a *app) request(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { type Downloader struct {
return &request{ log *zap.Logger
RequestCtx: ctx, plant neofs.ClientPlant
log: log, getOperations struct {
obj: a.plant.Object(), client client.Client
sessionToken *token.SessionToken
} }
} }
func (a *app) byAddress(c *fasthttp.RequestCtx) { func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downloader, error) {
var err error
d := &Downloader{log: log, plant: plant}
d.getOperations.client, d.getOperations.sessionToken, err = d.plant.GetReusableArtifacts(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get neofs client's reusable artifacts")
}
return d, nil
}
func (a *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{
RequestCtx: ctx,
log: log,
objectClient: a.plant.Object(),
}
}
func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) {
var ( var (
err error err error
address = object.NewAddress() address = object.NewAddress()
@ -142,16 +178,16 @@ func (a *app) byAddress(c *fasthttp.RequestCtx) {
c.Error("wrong object address", fasthttp.StatusBadRequest) c.Error("wrong object address", fasthttp.StatusBadRequest)
return return
} }
// TODO: Take this from a sync-pool. getOpts := getOptionsPool.Get().(*neofs.GetOptions)
getOpts := new(neofs.GetOptions) defer getOptionsPool.Put(getOpts)
getOpts.Client = a.getOperations.client getOpts.Client = a.getOperations.client
getOpts.SessionToken = a.getOperations.sessionToken getOpts.SessionToken = a.getOperations.sessionToken
getOpts.ObjectAddress = address getOpts.ObjectAddress = address
getOpts.Writer = nil getOpts.Writer = nil
a.request(c, log).receiveFile(getOpts) a.newRequest(c, log).receiveFile(getOpts)
} }
func (a *app) byAttribute(c *fasthttp.RequestCtx) { func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
var ( var (
err error err error
scid, _ = c.UserValue("cid").(string) scid, _ = c.UserValue("cid").(string)
@ -165,8 +201,8 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) {
c.Error("wrong container id", fasthttp.StatusBadRequest) c.Error("wrong container id", fasthttp.StatusBadRequest)
return return
} }
// TODO: Take this from a sync-pool. searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions)
searchOpts := new(neofs.SearchOptions) defer searchOptionsPool.Put(searchOpts)
searchOpts.Client = a.getOperations.client searchOpts.Client = a.getOperations.client
searchOpts.SessionToken = a.getOperations.sessionToken searchOpts.SessionToken = a.getOperations.sessionToken
searchOpts.BearerToken = nil searchOpts.BearerToken = nil
@ -191,11 +227,11 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) {
address := object.NewAddress() address := object.NewAddress()
address.SetContainerID(cid) address.SetContainerID(cid)
address.SetObjectID(ids[0]) address.SetObjectID(ids[0])
// TODO: Take this from a sync-pool. getOpts := getOptionsPool.Get().(*neofs.GetOptions)
getOpts := new(neofs.GetOptions) defer getOptionsPool.Put(getOpts)
getOpts.Client = a.getOperations.client getOpts.Client = a.getOperations.client
getOpts.SessionToken = a.getOperations.sessionToken getOpts.SessionToken = a.getOperations.sessionToken
getOpts.ObjectAddress = address getOpts.ObjectAddress = address
getOpts.Writer = nil getOpts.Writer = nil
a.request(c, log).receiveFile(getOpts) a.newRequest(c, log).receiveFile(getOpts)
} }

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/container"
@ -17,6 +18,12 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var putOptionsPool = sync.Pool{
New: func() interface{} {
return new(neofs.PutOptions)
},
}
type Uploader struct { type Uploader struct {
log *zap.Logger log *zap.Logger
plant neofs.ClientPlant plant neofs.ClientPlant
@ -89,15 +96,14 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
attributes = append(attributes, timestamp) attributes = append(attributes, timestamp)
} }
oid, bt := u.fetchOwnerAndBearerToken(c) oid, bt := u.fetchOwnerAndBearerToken(c)
// prepares new object and fill it // Prepare a new object and fill it.
raw := object.NewRaw() raw := object.NewRaw()
raw.SetContainerID(cid) raw.SetContainerID(cid)
raw.SetOwnerID(oid) raw.SetOwnerID(oid)
raw.SetAttributes(attributes...) raw.SetAttributes(attributes...)
// tries to put file into NeoFS or throw error putOpts := putOptionsPool.Get().(*neofs.PutOptions)
// if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil { defer putOptionsPool.Put(putOpts)
// TODO: Take this from a sync pool. // Try to put file into NeoFS or throw an error.
putOpts := new(neofs.PutOptions)
putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c) putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c)
if err != nil { if err != nil {
log.Error("failed to get neofs client's reusable artifacts", zap.Error(err)) log.Error("failed to get neofs client's reusable artifacts", zap.Error(err))
@ -114,14 +120,14 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
return return
} }
// tries to return response, otherwise, if something went wrong throw error // Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(c); err != nil { if err = newPutResponse(addr).encode(c); err != nil {
log.Error("could not prepare response", zap.Error(err)) log.Error("could not prepare response", zap.Error(err))
c.Error("could not prepare response", fasthttp.StatusBadRequest) c.Error("could not prepare response", fasthttp.StatusBadRequest)
return return
} }
// reports status code and content type // Report status code and content type.
c.Response.SetStatusCode(fasthttp.StatusOK) c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.Header.SetContentType(jsonHeader) c.Response.Header.SetContentType(jsonHeader)
} }