forked from TrueCloudLab/frostfs-http-gw
Refactoring uploading
- DisablePreParseMultipartForm = true - used `fetchMultipartFile` method instead of `MultipartForm` - don't store temporary files, only streaming Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
3b8bf3017d
commit
3f635a018a
3 changed files with 52 additions and 44 deletions
2
app.go
2
app.go
|
@ -102,7 +102,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
// FIXME don't work with StreamRequestBody,
|
// FIXME don't work with StreamRequestBody,
|
||||||
// some bugs with readMultipartForm
|
// some bugs with readMultipartForm
|
||||||
// https://github.com/valyala/fasthttp/issues/968
|
// https://github.com/valyala/fasthttp/issues/968
|
||||||
// a.web.DisablePreParseMultipartForm = true
|
a.web.DisablePreParseMultipartForm = true
|
||||||
|
|
||||||
a.web.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
a.web.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
||||||
// -- -- -- -- -- -- -- -- -- --
|
// -- -- -- -- -- -- -- -- -- --
|
||||||
|
|
41
multipart.go
Normal file
41
multipart.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"mime/multipart"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MultipartFile interface {
|
||||||
|
io.ReadCloser
|
||||||
|
FileName() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartFile, error) {
|
||||||
|
reader := multipart.NewReader(r, boundary)
|
||||||
|
|
||||||
|
for {
|
||||||
|
part, err := reader.NextPart()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
name := part.FormName()
|
||||||
|
if name == "" {
|
||||||
|
l.Debug("ignore part, empty form name")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := part.FileName()
|
||||||
|
|
||||||
|
// ignore multipart/form-data values
|
||||||
|
if filename == "" {
|
||||||
|
l.Debug("ignore part, empty filename", zap.String("form", name))
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return part, nil
|
||||||
|
}
|
||||||
|
}
|
53
upload.go
53
upload.go
|
@ -3,7 +3,6 @@ package main
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -37,10 +36,8 @@ func (pr *putResponse) Encode(w io.Writer) error {
|
||||||
func (a *app) upload(c *fasthttp.RequestCtx) {
|
func (a *app) upload(c *fasthttp.RequestCtx) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
name string
|
file MultipartFile
|
||||||
tmp io.Reader
|
|
||||||
addr *object.Address
|
addr *object.Address
|
||||||
form *multipart.Form
|
|
||||||
cid = container.NewID()
|
cid = container.NewID()
|
||||||
sCID, _ = c.UserValue("cid").(string)
|
sCID, _ = c.UserValue("cid").(string)
|
||||||
|
|
||||||
|
@ -55,54 +52,24 @@ func (a *app) upload(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// if temporary reader can be closed - close it
|
// if temporary reader can be closed - close it
|
||||||
if closer, ok := tmp.(io.Closer); ok && closer != nil {
|
if file == nil {
|
||||||
log.Debug("close temporary multipart/form file", zap.Error(closer.Close()))
|
|
||||||
}
|
|
||||||
|
|
||||||
if form == nil {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("cleanup multipart/form", zap.Error(form.RemoveAll()))
|
log.Debug("close temporary multipart/form file",
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("filename", file.FileName()),
|
||||||
|
zap.Error(file.Close()))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// tries to receive multipart/form or throw error
|
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||||
if form, err = c.MultipartForm(); err != nil {
|
if file, err = fetchMultipartFile(a.log, c.RequestBodyStream(), boundary); err != nil {
|
||||||
log.Error("could not receive multipart/form", zap.Error(err))
|
log.Error("could not receive multipart/form", zap.Error(err))
|
||||||
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// checks that received multipart/form contains only one `file` per request
|
|
||||||
if ln := len(form.File); ln != 1 {
|
|
||||||
log.Error("received multipart/form with more then one file", zap.Int("count", ln))
|
|
||||||
c.Error("received multipart/form with more then one file", fasthttp.StatusBadRequest)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, file := range form.File {
|
|
||||||
// because multipart/form can contains multiple FileHeader records
|
|
||||||
// we should check that we have only one per request or throw error
|
|
||||||
if ln := len(file); ln != 1 {
|
|
||||||
log.Error("received multipart/form file should contains one record", zap.Int("count", ln))
|
|
||||||
c.Error("received multipart/form file should contains one record", fasthttp.StatusBadRequest)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
name = file[0].Filename
|
|
||||||
|
|
||||||
// opens multipart/form file to work within or throw error
|
|
||||||
if tmp, err = file[0].Open(); err != nil {
|
|
||||||
log.Error("could not prepare uploaded file", zap.Error(err))
|
|
||||||
c.Error("could not prepare uploaded file", fasthttp.StatusBadRequest)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
filtered := filterHeaders(a.log, &c.Request.Header)
|
filtered := filterHeaders(a.log, &c.Request.Header)
|
||||||
attributes := make([]*object.Attribute, 0, len(filtered))
|
attributes := make([]*object.Attribute, 0, len(filtered))
|
||||||
|
|
||||||
|
@ -119,7 +86,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) {
|
||||||
if _, ok := filtered[object.AttributeFileName]; !ok {
|
if _, ok := filtered[object.AttributeFileName]; !ok {
|
||||||
filename := object.NewAttribute()
|
filename := object.NewAttribute()
|
||||||
filename.SetKey(object.AttributeFileName)
|
filename.SetKey(object.AttributeFileName)
|
||||||
filename.SetValue(name)
|
filename.SetValue(file.FileName())
|
||||||
|
|
||||||
attributes = append(attributes, filename)
|
attributes = append(attributes, filename)
|
||||||
}
|
}
|
||||||
|
@ -140,7 +107,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) {
|
||||||
raw.SetAttributes(attributes...)
|
raw.SetAttributes(attributes...)
|
||||||
|
|
||||||
// tries to put file into NeoFS or throw error
|
// tries to put file into NeoFS or throw error
|
||||||
if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(tmp)); err != nil {
|
if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil {
|
||||||
log.Error("could not store file in NeoFS", zap.Error(err))
|
log.Error("could not store file in NeoFS", zap.Error(err))
|
||||||
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
|
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue