From 1e3df95eed5737c3325ad1176921b17f18558531 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 21 Apr 2022 18:04:41 +0300 Subject: [PATCH] [#145] Use application context in NeoFS API requests It is meaningless to use RequestCtx as a context.Context for NeoFS operation, because context won't be closed until application shutdown. Moreover, it also triggers data race detection, because server's done channel, which is accessible for reading from RequestCtx, is set to `nil`. Using application context doesn't change gateway behavior, but it suppresses data race trigger at shutdown. It also allows possibility to set configurable timeouts for NeoFS networking if we will ever need them. Signed-off-by: Alex Vanin --- app.go | 4 ++-- downloader/download.go | 15 +++++++++------ downloader/head.go | 4 ++-- uploader/upload.go | 10 ++++------ 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/app.go b/app.go index 9e669eb..bfdd66b 100644 --- a/app.go +++ b/app.go @@ -208,8 +208,8 @@ func (a *app) Serve(ctx context.Context) { close(a.webDone) }() edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) - uploader := uploader.New(a.log, a.pool, edts) - downloader := downloader.New(a.log, downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}, a.pool) + uploader := uploader.New(ctx, a.log, a.pool, edts) + downloader := downloader.New(ctx, a.log, downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}, a.pool) // Configure router. r := router.New() r.RedirectTrailingSlash = true diff --git a/downloader/download.go b/downloader/download.go index bf525bd..4f405f1 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -31,7 +31,8 @@ import ( type request struct { *fasthttp.RequestCtx - log *zap.Logger + appCtx context.Context + log *zap.Logger } var errObjectNotFound = errors.New("object not found") @@ -106,7 +107,7 @@ func (r request) receiveFile(clnt *pool.Pool, objectAddress *address.Address) { prm.SetAddress(*objectAddress) prm.UseBearer(bearerToken(r.RequestCtx)) - rObj, err := clnt.GetObject(r.RequestCtx, prm) + rObj, err := clnt.GetObject(r.appCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -247,6 +248,7 @@ func (r *request) handleNeoFSErr(err error, start time.Time) { // Downloader is a download request handler. type Downloader struct { + appCtx context.Context log *zap.Logger pool *pool.Pool settings Settings @@ -257,13 +259,14 @@ type Settings struct { } // New creates an instance of Downloader using specified options. -func New(log *zap.Logger, settings Settings, conns *pool.Pool) *Downloader { - return &Downloader{log: log, pool: conns, settings: settings} +func New(ctx context.Context, log *zap.Logger, settings Settings, conns *pool.Pool) *Downloader { + return &Downloader{appCtx: ctx, log: log, pool: conns, settings: settings} } func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ RequestCtx: ctx, + appCtx: d.appCtx, log: log, } } @@ -354,7 +357,7 @@ func (d *Downloader) search(c *fasthttp.RequestCtx, cid *cid.ID, key, val string prm.SetFilters(filters) prm.UseBearer(bearerToken(c)) - return d.pool.SearchObjects(c, prm) + return d.pool.SearchObjects(d.appCtx, prm) } // DownloadZipped handles zip by prefix requests. @@ -423,7 +426,7 @@ func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { prm.SetAddress(addr) prm.UseBearer(btoken) - resGet, err = d.pool.GetObject(c, prm) + resGet, err = d.pool.GetObject(d.appCtx, prm) if err != nil { err = fmt.Errorf("get NeoFS object: %v", err) return true diff --git a/downloader/head.go b/downloader/head.go index a299d31..6eb10e6 100644 --- a/downloader/head.go +++ b/downloader/head.go @@ -39,7 +39,7 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { prm.SetAddress(*objectAddress) prm.UseBearer(btoken) - obj, err := clnt.HeadObject(r.RequestCtx, prm) + obj, err := clnt.HeadObject(r.appCtx, prm) if err != nil { r.handleNeoFSErr(err, start) return @@ -79,7 +79,7 @@ func (r request) headObject(clnt *pool.Pool, objectAddress *address.Address) { prmRange.SetLength(sz) prmRange.UseBearer(btoken) - return clnt.ObjectRange(r.RequestCtx, prmRange) + return clnt.ObjectRange(r.appCtx, prmRange) }) if err != nil && err != io.EOF { r.handleNeoFSErr(err, start) diff --git a/uploader/upload.go b/uploader/upload.go index 966dd21..e6b71a6 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -31,6 +31,7 @@ const ( // Uploader is an upload request handler. type Uploader struct { + appCtx context.Context log *zap.Logger pool *pool.Pool enableDefaultTimestamp bool @@ -44,8 +45,8 @@ type epochDurations struct { // New creates a new Uploader using specified logger, connection pool and // other options. -func New(log *zap.Logger, conns *pool.Pool, enableDefaultTimestamp bool) *Uploader { - return &Uploader{log, conns, enableDefaultTimestamp} +func New(ctx context.Context, log *zap.Logger, conns *pool.Pool, enableDefaultTimestamp bool) *Uploader { + return &Uploader{ctx, log, conns, enableDefaultTimestamp} } // Upload handles multipart upload request. @@ -134,15 +135,12 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { obj.SetOwnerID(id) obj.SetAttributes(attributes...) - ctx, cancel := context.WithCancel(c) - defer cancel() - var prm pool.PrmObjectPut prm.SetHeader(*obj) prm.SetPayload(file) prm.UseBearer(bt) - if idObj, err = u.pool.PutObject(ctx, prm); err != nil { + if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil { log.Error("could not store file in neofs", zap.Error(err)) response.Error(c, "could not store file in neofs: "+err.Error(), fasthttp.StatusBadRequest) return