From ed3c576b7075e969877e0d011a9ff44dddc555a1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 28 Apr 2021 11:39:12 +0300 Subject: [PATCH] uploader: drain body buffer before ending request processing Fixes 2021-04-28T00:03:36.514+0300 debug uploader/upload.go:64 close temporary multipart/form file {"cid": "Dxhf4PNprrJHWWTG5RGLdfLkJiSQ3AQqit1MSnEPRkDZ", "address": "Dxhf4PNprrJHWWTG5RGLdfLkJiSQ3AQqit1MSnEPRkDZ/2m8PtaoricLouCn5zE8hAFr3gZEBDCZFe9BEgVJTSocX", "filename": "vid.mp4"} 2021/04/28 00:03:36 error when serving connection "127.0.0.1:8082"<->"127.0.0.1:41390": error when reading request headers: cannot find http request method in "0\r\n\r\n". Buffer size=5, contents: "0\r\n\r\n" --- uploader/upload.go | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/uploader/upload.go b/uploader/upload.go index 3c0da28..02da473 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -18,7 +18,10 @@ import ( "go.uber.org/zap" ) -const jsonHeader = "application/json; charset=UTF-8" +const ( + jsonHeader = "application/json; charset=UTF-8" + drainBufSize = 4096 +) var putOptionsPool = sync.Pool{ New: func() interface{} { @@ -38,12 +41,14 @@ func New(log *zap.Logger, plant neofs.ClientPlant, enableDefaultTimestamp bool) func (u *Uploader) Upload(c *fasthttp.RequestCtx) { var ( - err error - file MultipartFile - addr *object.Address - cid = container.NewID() - scid, _ = c.UserValue("cid").(string) - log = u.log.With(zap.String("cid", scid)) + err error + file MultipartFile + addr *object.Address + cid = container.NewID() + scid, _ = c.UserValue("cid").(string) + log = u.log.With(zap.String("cid", scid)) + bodyStream = c.RequestBodyStream() + drainBuf = make([]byte, drainBufSize) ) if err = tokens.StoreBearerToken(c); err != nil { log.Error("could not fetch bearer token", zap.Error(err)) @@ -69,7 +74,7 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { ) }() boundary := string(c.Request.Header.MultipartFormBoundary()) - if file, err = fetchMultipartFile(u.log, c.RequestBodyStream(), boundary); err != nil { + if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil { log.Error("could not receive multipart/form", zap.Error(err)) c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return @@ -124,6 +129,18 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { return } + // Multipart is multipart and thus can contain more than one part which + // we ignore at the moment. Also, when dealing with chunked encoding + // the last zero-length chunk might be left unread (because multipart + // reader only cares about its boundary and doesn't look further) and + // it will be (erroneously) interpreted as the start of the next + // pipelined header. Thus we need to drain the body buffer. + for { + _, err = bodyStream.Read(drainBuf) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + } // Report status code and content type. c.Response.SetStatusCode(fasthttp.StatusOK) c.Response.Header.SetContentType(jsonHeader)