diff --git a/app.go b/app.go index 9e669eb..bfdd66b 100644 --- a/app.go +++ b/app.go @@ -208,8 +208,8 @@ func (a *app) Serve(ctx context.Context) { close(a.webDone) }() edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) - uploader := uploader.New(a.log, a.pool, edts) - downloader := downloader.New(a.log, downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}, a.pool) + uploader := uploader.New(ctx, a.log, a.pool, edts) + downloader := downloader.New(ctx, a.log, downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}, a.pool) // Configure router. r := router.New() r.RedirectTrailingSlash = true diff --git a/downloader/download.go b/downloader/download.go index bf525bd..4f405f1 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -31,7 +31,8 @@ import ( type request struct { *fasthttp.RequestCtx - log *zap.Logger + appCtx context.Context + log *zap.Logger } var errObjectNotFound = errors.New("object not found") @@ -106,7 +107,7 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress *address.Address) { prm.SetAddress(*objectAddress) prm.UseBearer(bearerToken(r.RequestCtx)) - rObj, err := clnt.GetObject(r.RequestCtx, prm) + rObj, err := clnt.GetObject(r.appCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -247,6 +248,7 @@ func (r *request) handleNeoFSErr(err error, start time.Time) { // Downloader is a download request handler. type Downloader struct { + appCtx context.Context log *zap.Logger pool *pool.Pool settings Settings @@ -257,13 +259,14 @@ type Settings struct { } // New creates an instance of Downloader using specified options. -func New(log *zap.Logger, settings Settings, conns *pool.Pool) *Downloader { - return &Downloader{log: log, pool: conns, settings: settings} +func New(ctx context.Context, log *zap.Logger, settings Settings, conns *pool.Pool) *Downloader { + return &Downloader{appCtx: ctx, log: log, pool: conns, settings: settings} } func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ RequestCtx: ctx, + appCtx: d.appCtx, log: log, } } @@ -354,7 +357,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string prm.SetFilters(filters) prm.UseBearer(bearerToken(c)) - return d.pool.SearchObjects(c, prm) + return d.pool.SearchObjects(d.appCtx, prm) } // DownloadZipped handles zip by prefix requests. @@ -423,7 +426,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { prm.SetAddress(addr) prm.UseBearer(btoken) - resGet, err = d.pool.GetObject(c, prm) + resGet, err = d.pool.GetObject(d.appCtx, prm) if err != nil { err = fmt.Errorf("get NeoFS object: %v", err) return true diff --git a/downloader/head.go b/downloader/head.go index a299d31..6eb10e6 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -39,7 +39,7 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { prm.SetAddress(*objectAddress) prm.UseBearer(btoken) - obj, err := clnt.HeadObject(r.RequestCtx, prm) + obj, err := clnt.HeadObject(r.appCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -79,7 +79,7 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { prmRange.SetLength(sz) prmRange.UseBearer(btoken) - return clnt.ObjectRange(r.RequestCtx, prmRange) + return clnt.ObjectRange(r.appCtx, prmRange) }) if err != nil && err != io.EOF { r.handleNeoFSErr(err, start) diff --git a/uploader/upload.go b/uploader/upload.go index 966dd21..e6b71a6 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -31,6 +31,7 @@ const ( // Uploader is an upload request handler. type Uploader struct { + appCtx context.Context log *zap.Logger pool *pool.Pool enableDefaultTimestamp bool @@ -44,8 +45,8 @@ type epochDurations struct { // New creates a new Uploader using specified logger, connection pool and // other options. -func New(log *zap.Logger, conns *pool.Pool, enableDefaultTimestamp bool) *Uploader { - return &Uploader{log, conns, enableDefaultTimestamp} +func New(ctx context.Context, log *zap.Logger, conns *pool.Pool, enableDefaultTimestamp bool) *Uploader { + return &Uploader{ctx, log, conns, enableDefaultTimestamp} } // Upload handles multipart upload request. @@ -134,15 +135,12 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { obj.SetOwnerID(id) obj.SetAttributes(attributes...) - ctx, cancel := context.WithCancel(c) - defer cancel() - var prm pool.PrmObjectPut prm.SetHeader(*obj) prm.SetPayload(file) prm.UseBearer(bt) - if idObj, err = u.pool.PutObject(ctx, prm); err != nil { + if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil { log.Error("could not store file in neofs", zap.Error(err)) response.Error(c, "could not store file in neofs: "+err.Error(), fasthttp.StatusBadRequest) return