From 3a5d9fe94c734276707c65f9ae6343498f2c4454 Mon Sep 17 00:00:00 2001 From: Pavel Korotkov Date: Wed, 31 Mar 2021 21:24:41 +0300 Subject: [PATCH] [#19] Extract downloading logic into a separate package Signed-off-by: Pavel Korotkov --- app.go | 39 ++++++--------- receive.go => downloader/download.go | 74 +++++++++++++++++++++------- uploader/upload.go | 20 +++++--- 3 files changed, 83 insertions(+), 50 deletions(-) rename receive.go => downloader/download.go (74%) diff --git a/app.go b/app.go index f1f1336..06b0bf7 100644 --- a/app.go +++ b/app.go @@ -6,8 +6,7 @@ import ( "strconv" "github.com/fasthttp/router" - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-http-gate/downloader" "github.com/nspcc-dev/neofs-http-gate/logger" "github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/uploader" @@ -19,14 +18,10 @@ import ( type ( app struct { - plant neofs.ClientPlant - getOperations struct { - client client.Client - sessionToken *token.SessionToken - } log *zap.Logger + plant neofs.ClientPlant cfg *viper.Viper - wlog logger.Logger + auxiliaryLog logger.Logger web *fasthttp.Server jobDone chan struct{} webDone chan struct{} @@ -62,24 +57,20 @@ func WithConfig(c *viper.Viper) Option { func newApp(ctx context.Context, opt ...Option) App { a := &app{ - log: zap.L(), - cfg: viper.GetViper(), - web: new(fasthttp.Server), - + log: zap.L(), + cfg: viper.GetViper(), + web: new(fasthttp.Server), jobDone: make(chan struct{}), webDone: make(chan struct{}), } - for i := range opt { opt[i](a) } - a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) - - a.wlog = logger.GRPC(a.log) + a.auxiliaryLog = logger.GRPC(a.log) if a.cfg.GetBool(cmdVerbose) { - grpclog.SetLoggerV2(a.wlog) + grpclog.SetLoggerV2(a.auxiliaryLog) } // conTimeout := a.cfg.GetDuration(cfgConTimeout) @@ -124,10 +115,6 @@ func newApp(ctx context.Context, opt ...Option) App { if err != nil { a.log.Fatal("failed to create neofs client") } - a.getOperations.client, a.getOperations.sessionToken, err = a.plant.GetReusableArtifacts(ctx) - if err != nil { - a.log.Fatal("failed to get neofs client's reusable artifacts") - } return a } @@ -153,21 +140,25 @@ func (a *app) Serve(ctx context.Context) { close(a.webDone) }() uploader := uploader.New(a.log, a.plant, a.enableDefaultTimestamp) + downloader, err := downloader.New(ctx, a.log, a.plant) + if err != nil { + a.log.Fatal("failed to create downloader", zap.Error(err)) + } // Configure router. r := router.New() r.RedirectTrailingSlash = true r.POST("/upload/{cid}", uploader.Upload) a.log.Info("added path /upload/{cid}") - r.GET("/get/{cid}/{oid}", a.byAddress) + r.GET("/get/{cid}/{oid}", downloader.DownloadByAddress) a.log.Info("added path /get/{cid}/{oid}") - r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.byAttribute) + r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", downloader.DownloadByAttribute) a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") // attaching /-/(ready,healthy) // attachHealthy(r, a.pool.Status) // enable metrics if a.cfg.GetBool(cmdMetrics) { a.log.Info("added path /metrics/") - attachMetrics(r, a.wlog) + attachMetrics(r, a.auxiliaryLog) } // enable pprof if a.cfg.GetBool(cmdPprof) { diff --git a/receive.go b/downloader/download.go similarity index 74% rename from receive.go rename to downloader/download.go index 25199f8..37da819 100644 --- a/receive.go +++ b/downloader/download.go @@ -1,6 +1,7 @@ -package main +package downloader import ( + "context" "io" "net/http" "path" @@ -9,8 +10,10 @@ import ( "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gate/neofs" "github.com/nspcc-dev/neofs-http-gate/tokens" "github.com/pkg/errors" @@ -20,6 +23,20 @@ import ( "google.golang.org/grpc/status" ) +var ( + getOptionsPool = sync.Pool{ + New: func() interface{} { + return new(neofs.GetOptions) + }, + } + + searchOptionsPool = sync.Pool{ + New: func() interface{} { + return new(neofs.SearchOptions) + }, + } +) + type ( detector struct { io.Writer @@ -29,8 +46,8 @@ type ( request struct { *fasthttp.RequestCtx - log *zap.Logger - obj neofs.ObjectClient + log *zap.Logger + objectClient neofs.ObjectClient } objectIDs []*object.ID @@ -61,7 +78,7 @@ func (r *request) receiveFile(options *neofs.GetOptions) { } writer := newDetector(r.Response.BodyWriter()) options.Writer = writer - obj, err := r.obj.Get(r.RequestCtx, options) + obj, err := r.objectClient.Get(r.RequestCtx, options) if err != nil { r.log.Error( "could not receive object", @@ -120,15 +137,34 @@ func (o objectIDs) Slice() []string { return res } -func (a *app) request(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { - return &request{ - RequestCtx: ctx, - log: log, - obj: a.plant.Object(), +type Downloader struct { + log *zap.Logger + plant neofs.ClientPlant + getOperations struct { + client client.Client + sessionToken *token.SessionToken } } -func (a *app) byAddress(c *fasthttp.RequestCtx) { +func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downloader, error) { + var err error + d := &Downloader{log: log, plant: plant} + d.getOperations.client, d.getOperations.sessionToken, err = d.plant.GetReusableArtifacts(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get neofs client's reusable artifacts") + } + return d, nil +} + +func (a *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { + return &request{ + RequestCtx: ctx, + log: log, + objectClient: a.plant.Object(), + } +} + +func (a *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { var ( err error address = object.NewAddress() @@ -142,16 +178,16 @@ func (a *app) byAddress(c *fasthttp.RequestCtx) { c.Error("wrong object address", fasthttp.StatusBadRequest) return } - // TODO: Take this from a sync-pool. - getOpts := new(neofs.GetOptions) + getOpts := getOptionsPool.Get().(*neofs.GetOptions) + defer getOptionsPool.Put(getOpts) getOpts.Client = a.getOperations.client getOpts.SessionToken = a.getOperations.sessionToken getOpts.ObjectAddress = address getOpts.Writer = nil - a.request(c, log).receiveFile(getOpts) + a.newRequest(c, log).receiveFile(getOpts) } -func (a *app) byAttribute(c *fasthttp.RequestCtx) { +func (a *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { var ( err error scid, _ = c.UserValue("cid").(string) @@ -165,8 +201,8 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) { c.Error("wrong container id", fasthttp.StatusBadRequest) return } - // TODO: Take this from a sync-pool. - searchOpts := new(neofs.SearchOptions) + searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions) + defer searchOptionsPool.Put(searchOpts) searchOpts.Client = a.getOperations.client searchOpts.SessionToken = a.getOperations.sessionToken searchOpts.BearerToken = nil @@ -191,11 +227,11 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) { address := object.NewAddress() address.SetContainerID(cid) address.SetObjectID(ids[0]) - // TODO: Take this from a sync-pool. - getOpts := new(neofs.GetOptions) + getOpts := getOptionsPool.Get().(*neofs.GetOptions) + defer getOptionsPool.Put(getOpts) getOpts.Client = a.getOperations.client getOpts.SessionToken = a.getOperations.sessionToken getOpts.ObjectAddress = address getOpts.Writer = nil - a.request(c, log).receiveFile(getOpts) + a.newRequest(c, log).receiveFile(getOpts) } diff --git a/uploader/upload.go b/uploader/upload.go index ad03b0d..a77272d 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "strconv" + "sync" "time" "github.com/nspcc-dev/neofs-api-go/pkg/container" @@ -17,6 +18,12 @@ import ( "go.uber.org/zap" ) +var putOptionsPool = sync.Pool{ + New: func() interface{} { + return new(neofs.PutOptions) + }, +} + type Uploader struct { log *zap.Logger plant neofs.ClientPlant @@ -89,15 +96,14 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { attributes = append(attributes, timestamp) } oid, bt := u.fetchOwnerAndBearerToken(c) - // prepares new object and fill it + // Prepare a new object and fill it. raw := object.NewRaw() raw.SetContainerID(cid) raw.SetOwnerID(oid) raw.SetAttributes(attributes...) - // tries to put file into NeoFS or throw error - // if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil { - // TODO: Take this from a sync pool. - putOpts := new(neofs.PutOptions) + putOpts := putOptionsPool.Get().(*neofs.PutOptions) + defer putOptionsPool.Put(putOpts) + // Try to put file into NeoFS or throw an error. putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c) if err != nil { log.Error("failed to get neofs client's reusable artifacts", zap.Error(err)) @@ -114,14 +120,14 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) return } - // tries to return response, otherwise, if something went wrong throw error + // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { log.Error("could not prepare response", zap.Error(err)) c.Error("could not prepare response", fasthttp.StatusBadRequest) return } - // reports status code and content type + // Report status code and content type. c.Response.SetStatusCode(fasthttp.StatusOK) c.Response.Header.SetContentType(jsonHeader) }