From 462de45e12c0b731e7cce4d031350cab1f0e26de Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Mon, 25 Jan 2021 22:36:46 +0300 Subject: [PATCH 01/10] Added POST method to upload files into NeoFS through HTTP Gate - Updated README - Added method to upload files into NeoFS - HTTP Upload Header Filter loaded from settings - Added `HeaderFilter` that filters headers (object attributes) Signed-off-by: Evgeniy Kulikov --- README.md | 5 ++ app.go | 8 ++- filter.go | 86 +++++++++++++++++++++++++++++++ settings.go | 15 +++++- upload.go | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 257 insertions(+), 3 deletions(-) create mode 100644 filter.go create mode 100644 upload.go diff --git a/README.md b/README.md index 9d704f3..c4652ed 100644 --- a/README.md +++ b/README.md @@ -68,4 +68,9 @@ Peers preset: HTTP_GW_PEERS_[N]_ADDRESS = string HTTP_GW_PEERS_[N]_WEIGHT = 0..1 (float) + +Upload Header Table: + +HTTP_GW_UPLOADER_HEADER_[N]_KEY = string - HTTP Header attribute name (except `X-Attribute-`) +HTTP_GW_UPLOADER_HEADER_[N]_VAL = string - NeoFS Object attribute mapping ``` \ No newline at end of file diff --git a/app.go b/app.go index 4b56ac3..57fff2e 100644 --- a/app.go +++ b/app.go @@ -26,6 +26,8 @@ type ( cfg *viper.Viper key *ecdsa.PrivateKey + hdr HeaderFilter + wlog logger.Logger web *fasthttp.Server @@ -74,6 +76,8 @@ func newApp(ctx context.Context, opt ...Option) App { opt[i](a) } + a.hdr = newHeaderFilter(a.log, a.cfg) + a.wlog = logger.GRPC(a.log) if a.cfg.GetBool(cmdVerbose) { @@ -90,7 +94,6 @@ func newApp(ctx context.Context, opt ...Option) App { a.web.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize) a.web.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout) a.web.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout) - a.web.GetOnly = true a.web.DisableHeaderNamesNormalizing = true a.web.NoDefaultServerHeader = true a.web.NoDefaultContentType = true @@ -174,6 +177,9 @@ func (a *app) Serve(ctx context.Context) { r := router.New() r.RedirectTrailingSlash = true + a.log.Info("enabled /put/{cid}") + r.POST("/put/{cid}", a.upload) + a.log.Info("enabled /get/{cid}/{oid}") r.GET("/get/{cid}/{oid}", a.byAddress) diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..9c34015 --- /dev/null +++ b/filter.go @@ -0,0 +1,86 @@ +package main + +import ( + "bytes" + "strconv" + "strings" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/spf13/viper" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type ( + HeaderFilter interface { + Filter(header *fasthttp.RequestHeader) map[string]string + } + + headerFilter struct { + logger *zap.Logger + mapping map[string]string + } +) + +const userAttributeHeader = "X-Attribute-" + +func newHeaderFilter(l *zap.Logger, v *viper.Viper) HeaderFilter { + filter := &headerFilter{ + logger: l, + mapping: make(map[string]string), + } + + for i := 0; ; i++ { + index := strconv.Itoa(i) + key := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderKey}, ".") + rep := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderVal}, ".") + + keyValue := v.GetString(key) + repValue := v.GetString(rep) + + if keyValue == "" || repValue == "" { + break + } + + filter.mapping[keyValue] = repValue + + l.Debug("load upload header table value", + zap.String("key", keyValue), + zap.String("val", repValue)) + } + + // Default values + filter.mapping[object.AttributeFileName] = object.AttributeFileName + filter.mapping[object.AttributeTimestamp] = object.AttributeTimestamp + + return filter +} + +func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string { + result := make(map[string]string) + prefix := []byte(userAttributeHeader) + + header.VisitAll(func(key, val []byte) { + if len(key) == 0 || len(val) == 0 { + return + } else if !bytes.HasPrefix(key, prefix) { + return + } else if key = bytes.TrimPrefix(key, prefix); len(key) == 0 { + return + } else if name, ok := h.mapping[string(key)]; ok { + result[name] = string(val) + + h.logger.Debug("add attribute to result object", + zap.String("key", name), + zap.String("val", string(val))) + + return + } + + h.logger.Debug("ignore attribute", + zap.String("key", string(key)), + zap.String("val", string(val))) + }) + + return result +} diff --git a/settings.go b/settings.go index c781056..2917a18 100644 --- a/settings.go +++ b/settings.go @@ -16,8 +16,7 @@ import ( type empty int const ( - devNull = empty(0) - generated = "generated" + devNull = empty(0) defaultRebalanceTimer = 15 * time.Second defaultRequestTimeout = 15 * time.Second @@ -54,6 +53,11 @@ const ( cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" + // Uploader Header + cfgUploaderHeader = "uploader_header" + cfgUploaderHeaderKey = "key" + cfgUploaderHeaderVal = "val" + // Peers cfgPeers = "peers" @@ -179,6 +183,13 @@ func settings() *viper.Viper { fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers)) fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", Prefix, strings.ToUpper(cfgPeers)) + fmt.Println() + fmt.Println("Upload Header Table:") + fmt.Println() + + fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderKey)) + fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderVal)) + os.Exit(0) case version != nil && *version: fmt.Printf("NeoFS HTTP Gateway %s (%s)\n", Version, Build) diff --git a/upload.go b/upload.go new file mode 100644 index 0000000..128c3eb --- /dev/null +++ b/upload.go @@ -0,0 +1,146 @@ +package main + +import ( + "encoding/json" + "io" + "io/ioutil" + "mime/multipart" + "os" + "strconv" + "time" + + sdk "github.com/nspcc-dev/cdn-sdk" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type putResponse struct { + OID string `json:"object_id"` + CID string `json:"container_id"` +} + +func newPutResponse(addr *object.Address) *putResponse { + return &putResponse{ + OID: addr.ObjectID().String(), + CID: addr.ContainerID().String(), + } +} + +func (pr *putResponse) Encode(w io.Writer) error { + enc := json.NewEncoder(w) + enc.SetIndent("", "\t") + return enc.Encode(pr) +} + +func (a *app) upload(c *fasthttp.RequestCtx) { + var ( + err error + name string + tmp *os.File + addr *object.Address + form *multipart.Form + cid = container.NewID() + sCID, _ = c.UserValue("cid").(string) + + log = a.log.With(zap.String("cid", sCID)) + ) + + if err = cid.Parse(sCID); err != nil { + log.Error("wrong container id", zap.Error(err)) + c.Error("wrong container id", fasthttp.StatusBadRequest) + return + } + + if tmp, err = ioutil.TempFile("", "http-gate-upload-*"); err != nil { + log.Error("could not prepare temporary file", zap.Error(err)) + c.Error("could not prepare temporary file", fasthttp.StatusBadRequest) + return + } + + defer func() { + tmpName := tmp.Name() + + log.Debug("close temporary file", zap.Error(tmp.Close())) + log.Debug("remove temporary file", zap.Error(os.RemoveAll(tmpName))) + }() + + if form, err = c.MultipartForm(); err != nil { + log.Error("could not receive multipart form", zap.Error(err)) + c.Error("could not receive multipart form: "+err.Error(), fasthttp.StatusBadRequest) + + return + } else if ln := len(form.File); ln != 1 { + log.Error("received multipart form with more then one file", zap.Int("count", ln)) + c.Error("received multipart form with more then one file", fasthttp.StatusBadRequest) + + return + } + + for _, file := range form.File { + if ln := len(file); ln != 1 { + log.Error("received multipart form file should contains one record", zap.Int("count", ln)) + c.Error("received multipart form file should contains one record", fasthttp.StatusBadRequest) + + return + } + + name = file[0].Filename + + if err = fasthttp.SaveMultipartFile(file[0], tmp.Name()); err != nil { + log.Error("could not store uploaded file into temporary", zap.Error(err)) + c.Error("could not store uploaded file into temporary", fasthttp.StatusBadRequest) + + return + } + } + + filtered := a.hdr.Filter(&c.Request.Header) + attributes := make([]*object.Attribute, 0, len(filtered)) + + for key, val := range filtered { + attribute := object.NewAttribute() + attribute.SetKey(key) + attribute.SetValue(val) + + attributes = append(attributes, attribute) + } + + // Attribute FileName wasn't set from header + if _, ok := filtered[object.AttributeFileName]; ok { + filename := object.NewAttribute() + filename.SetKey(object.AttributeFileName) + filename.SetValue(name) + + attributes = append(attributes, filename) + } + + // Attribute Timestamp wasn't set from header + if _, ok := filtered[object.AttributeTimestamp]; ok { + timestamp := object.NewAttribute() + timestamp.SetKey(object.AttributeTimestamp) + timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) + + attributes = append(attributes, timestamp) + } + + raw := object.NewRaw() + raw.SetContainerID(cid) + raw.SetOwnerID(a.cli.Owner()) // should be various: from sdk / BearerToken + raw.SetAttributes(attributes...) + + if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(tmp)); err != nil { + log.Error("could not store file in NeoFS", zap.Error(err)) + c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) + + return + } else if err = newPutResponse(addr).Encode(c); err != nil { + log.Error("could not prepare response", zap.Error(err)) + c.Error("could not prepare response", fasthttp.StatusBadRequest) + + return + } + + c.Response.SetStatusCode(fasthttp.StatusOK) +} From 0346db462bbbad63631d39623ea75ebf10f8ef7f Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 26 Jan 2021 11:35:05 +0300 Subject: [PATCH 02/10] Add cleanup multipart form on defer Signed-off-by: Evgeniy Kulikov --- upload.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/upload.go b/upload.go index 128c3eb..f5e4324 100644 --- a/upload.go +++ b/upload.go @@ -64,6 +64,12 @@ func (a *app) upload(c *fasthttp.RequestCtx) { log.Debug("close temporary file", zap.Error(tmp.Close())) log.Debug("remove temporary file", zap.Error(os.RemoveAll(tmpName))) + + if form == nil { + return + } + + log.Debug("cleanup multipart form", zap.Error(form.RemoveAll())) }() if form, err = c.MultipartForm(); err != nil { From ace31ceefd95b8e8cc22ee3c4c459336d4ab87bf Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 26 Jan 2021 11:43:40 +0300 Subject: [PATCH 03/10] Add `application/json` Content-Type header Signed-off-by: Evgeniy Kulikov --- upload.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/upload.go b/upload.go index f5e4324..0850cf6 100644 --- a/upload.go +++ b/upload.go @@ -21,6 +21,8 @@ type putResponse struct { CID string `json:"container_id"` } +const jsonHeader = "application/json; charset=UTF-8" + func newPutResponse(addr *object.Address) *putResponse { return &putResponse{ OID: addr.ObjectID().String(), @@ -149,4 +151,5 @@ func (a *app) upload(c *fasthttp.RequestCtx) { } c.Response.SetStatusCode(fasthttp.StatusOK) + c.Response.Header.SetContentType(jsonHeader) } From 3cbd4dbd097a39f3f086b8984de2e695a87d466f Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 26 Jan 2021 12:40:01 +0300 Subject: [PATCH 04/10] Fixes after review - Maybe it could be rephrased to "attribute name prefixed with X-Attribute-" - Maybe it worth renaming to userAttributeHeaderPrefix - Requirement for having only one file uploaded at a time has to be reflected in documentation. - Maybe this should be reflected in doc - I'm not sure if missing timestamp attribute should be forced on gateway level. Normally it should be set by app and there should be a way not to set it. Signed-off-by: Evgeniy Kulikov --- README.md | 17 +++++++++++++++-- app.go | 4 ++++ filter.go | 4 ++-- settings.go | 10 +++++++--- upload.go | 4 ++-- 5 files changed, 30 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index c4652ed..d9f875b 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # NeoFS HTTP Gate NeoFS HTTP Gate is example of tool that provides basic interactions with NeoFS. -You can download files from NeoFS Network using NeoFS Gate. + +- you can download one file per request from NeoFS Network using NeoFS Gate +- you can upload one file per request into NeoFS Network using NeoFS Gate ## Notable make targets @@ -24,6 +26,11 @@ You can download files from NeoFS Network using NeoFS Gate. ```go get -u github.com/nspcc-dev/neofs-http-gate``` +## File uploading behaviors + +- you can upload on file per request +- if `FileName` not provided by Header attributes, multipart/form filename will be used instead + ## Configuration ``` @@ -64,6 +71,8 @@ of Timeout and if no activity is seen even after that the connection is closed HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=Bool - if true, client sends keepalive pings even with no active RPCs. If false, when there are no active RPCs, Time and Timeout will be ignored and no keepalive pings will be sent. +HTTP_GW_UPLOAD_HEADER_USE_DEFAULT_TIMESTAMP=bool - enable/disable adding current timestamp attribute when object uploads + Peers preset: HTTP_GW_PEERS_[N]_ADDRESS = string @@ -71,6 +80,10 @@ HTTP_GW_PEERS_[N]_WEIGHT = 0..1 (float) Upload Header Table: -HTTP_GW_UPLOADER_HEADER_[N]_KEY = string - HTTP Header attribute name (except `X-Attribute-`) +HTTP_GW_UPLOADER_HEADER_[N]_KEY = string - HTTP Header attribute name prefixed with `X-Attribute-` HTTP_GW_UPLOADER_HEADER_[N]_VAL = string - NeoFS Object attribute mapping + +# By default we had next headers: +- FileName - to set object filename attribute +- Timestamp - to set object timestamp attribute ``` \ No newline at end of file diff --git a/app.go b/app.go index 57fff2e..f565793 100644 --- a/app.go +++ b/app.go @@ -33,6 +33,8 @@ type ( jobDone chan struct{} webDone chan struct{} + + enableDefaultTimestamp bool } App interface { @@ -78,6 +80,8 @@ func newApp(ctx context.Context, opt ...Option) App { a.hdr = newHeaderFilter(a.log, a.cfg) + a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) + a.wlog = logger.GRPC(a.log) if a.cfg.GetBool(cmdVerbose) { diff --git a/filter.go b/filter.go index 9c34015..2f12eb5 100644 --- a/filter.go +++ b/filter.go @@ -22,7 +22,7 @@ type ( } ) -const userAttributeHeader = "X-Attribute-" +const userAttributeHeaderPrefix = "X-Attribute-" func newHeaderFilter(l *zap.Logger, v *viper.Viper) HeaderFilter { filter := &headerFilter{ @@ -58,7 +58,7 @@ func newHeaderFilter(l *zap.Logger, v *viper.Viper) HeaderFilter { func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string { result := make(map[string]string) - prefix := []byte(userAttributeHeader) + prefix := []byte(userAttributeHeaderPrefix) header.VisitAll(func(key, val []byte) { if len(key) == 0 || len(val) == 0 { diff --git a/settings.go b/settings.go index 2917a18..1775569 100644 --- a/settings.go +++ b/settings.go @@ -54,9 +54,10 @@ const ( cfgLoggerSamplingThereafter = "logger.sampling.thereafter" // Uploader Header - cfgUploaderHeader = "uploader_header" - cfgUploaderHeaderKey = "key" - cfgUploaderHeaderVal = "val" + cfgUploaderHeader = "uploader_header" + cfgUploaderHeaderKey = "key" + cfgUploaderHeaderVal = "val" + cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp" // Peers cfgPeers = "peers" @@ -144,6 +145,9 @@ func settings() *viper.Viper { v.SetDefault(cfgWebWriteTimeout, time.Minute) v.SetDefault(cfgWebConnectionPerHost, 10) + // upload header + v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) + if err := v.BindPFlags(flags); err != nil { panic(err) } diff --git a/upload.go b/upload.go index 0850cf6..b1330fc 100644 --- a/upload.go +++ b/upload.go @@ -124,8 +124,8 @@ func (a *app) upload(c *fasthttp.RequestCtx) { attributes = append(attributes, filename) } - // Attribute Timestamp wasn't set from header - if _, ok := filtered[object.AttributeTimestamp]; ok { + // Attribute Timestamp wasn't set from header and enabled by settings + if _, ok := filtered[object.AttributeTimestamp]; ok && a.enableDefaultTimestamp { timestamp := object.NewAttribute() timestamp.SetKey(object.AttributeTimestamp) timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) From ec70bfa4cc54a5e190ce40bdaab761f54f3f5d46 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 26 Jan 2021 18:36:53 +0300 Subject: [PATCH 05/10] Fixes after review - It's confusing. If there is no difference, I suggest having the route named after the protocol verb or resulting handler. So it should be either post or upload. - Don't you find that it would be more understandable without else ifs? - Why do we need a temporary file here? - etc Signed-off-by: Evgeniy Kulikov --- app.go | 4 ++-- filter.go | 18 ++++++++++++++--- upload.go | 60 ++++++++++++++++++++++++++++++------------------------- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/app.go b/app.go index f565793..6d666eb 100644 --- a/app.go +++ b/app.go @@ -181,8 +181,8 @@ func (a *app) Serve(ctx context.Context) { r := router.New() r.RedirectTrailingSlash = true - a.log.Info("enabled /put/{cid}") - r.POST("/put/{cid}", a.upload) + a.log.Info("enabled /upload/{cid}") + r.POST("/upload/{cid}", a.upload) a.log.Info("enabled /get/{cid}/{oid}") r.GET("/get/{cid}/{oid}", a.byAddress) diff --git a/filter.go b/filter.go index 2f12eb5..7e13766 100644 --- a/filter.go +++ b/filter.go @@ -61,13 +61,24 @@ func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string prefix := []byte(userAttributeHeaderPrefix) header.VisitAll(func(key, val []byte) { + // checks that key and val not empty if len(key) == 0 || len(val) == 0 { return - } else if !bytes.HasPrefix(key, prefix) { + } + + // checks that key has attribute prefix + if !bytes.HasPrefix(key, prefix) { return - } else if key = bytes.TrimPrefix(key, prefix); len(key) == 0 { + } + + // checks that after removing attribute prefix we had not empty key + if key = bytes.TrimPrefix(key, prefix); len(key) == 0 { return - } else if name, ok := h.mapping[string(key)]; ok { + } + + // checks mapping table and if we found record store it + // at resulting hashmap + if name, ok := h.mapping[string(key)]; ok { result[name] = string(val) h.logger.Debug("add attribute to result object", @@ -77,6 +88,7 @@ func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string return } + // otherwise inform that attribute will be ignored h.logger.Debug("ignore attribute", zap.String("key", string(key)), zap.String("val", string(val))) diff --git a/upload.go b/upload.go index b1330fc..6d72a61 100644 --- a/upload.go +++ b/upload.go @@ -3,9 +3,7 @@ package main import ( "encoding/json" "io" - "io/ioutil" "mime/multipart" - "os" "strconv" "time" @@ -40,7 +38,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { var ( err error name string - tmp *os.File + tmp io.Reader addr *object.Address form *multipart.Form cid = container.NewID() @@ -55,50 +53,51 @@ func (a *app) upload(c *fasthttp.RequestCtx) { return } - if tmp, err = ioutil.TempFile("", "http-gate-upload-*"); err != nil { - log.Error("could not prepare temporary file", zap.Error(err)) - c.Error("could not prepare temporary file", fasthttp.StatusBadRequest) - return - } - defer func() { - tmpName := tmp.Name() - - log.Debug("close temporary file", zap.Error(tmp.Close())) - log.Debug("remove temporary file", zap.Error(os.RemoveAll(tmpName))) + // if temporary reader can be closed - close it + if closer := tmp.(io.Closer); closer != nil { + log.Debug("close temporary multipart/form file", zap.Error(closer.Close())) + } if form == nil { return } - log.Debug("cleanup multipart form", zap.Error(form.RemoveAll())) + log.Debug("cleanup multipart/form", zap.Error(form.RemoveAll())) }() + // tries to receive multipart/form or throw error if form, err = c.MultipartForm(); err != nil { - log.Error("could not receive multipart form", zap.Error(err)) - c.Error("could not receive multipart form: "+err.Error(), fasthttp.StatusBadRequest) + log.Error("could not receive multipart/form", zap.Error(err)) + c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return - } else if ln := len(form.File); ln != 1 { - log.Error("received multipart form with more then one file", zap.Int("count", ln)) - c.Error("received multipart form with more then one file", fasthttp.StatusBadRequest) + } + + // checks that received multipart/form contains only one `file` per request + if ln := len(form.File); ln != 1 { + log.Error("received multipart/form with more then one file", zap.Int("count", ln)) + c.Error("received multipart/form with more then one file", fasthttp.StatusBadRequest) return } for _, file := range form.File { + // because multipart/form can contains multiple FileHeader records + // we should check that we have only one per request or throw error if ln := len(file); ln != 1 { - log.Error("received multipart form file should contains one record", zap.Int("count", ln)) - c.Error("received multipart form file should contains one record", fasthttp.StatusBadRequest) + log.Error("received multipart/form file should contains one record", zap.Int("count", ln)) + c.Error("received multipart/form file should contains one record", fasthttp.StatusBadRequest) return } name = file[0].Filename - if err = fasthttp.SaveMultipartFile(file[0], tmp.Name()); err != nil { - log.Error("could not store uploaded file into temporary", zap.Error(err)) - c.Error("could not store uploaded file into temporary", fasthttp.StatusBadRequest) + // opens multipart/form file to work within or throw error + if tmp, err = file[0].Open(); err != nil { + log.Error("could not prepare uploaded file", zap.Error(err)) + c.Error("could not prepare uploaded file", fasthttp.StatusBadRequest) return } @@ -107,6 +106,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { filtered := a.hdr.Filter(&c.Request.Header) attributes := make([]*object.Attribute, 0, len(filtered)) + // prepares attributes from filtered headers for key, val := range filtered { attribute := object.NewAttribute() attribute.SetKey(key) @@ -115,7 +115,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { attributes = append(attributes, attribute) } - // Attribute FileName wasn't set from header + // sets FileName attribute if it wasn't set from header if _, ok := filtered[object.AttributeFileName]; ok { filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) @@ -124,7 +124,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { attributes = append(attributes, filename) } - // Attribute Timestamp wasn't set from header and enabled by settings + // sets Timestamp attribute if it wasn't set from header and enabled by settings if _, ok := filtered[object.AttributeTimestamp]; ok && a.enableDefaultTimestamp { timestamp := object.NewAttribute() timestamp.SetKey(object.AttributeTimestamp) @@ -133,23 +133,29 @@ func (a *app) upload(c *fasthttp.RequestCtx) { attributes = append(attributes, timestamp) } + // prepares new object and fill it raw := object.NewRaw() raw.SetContainerID(cid) raw.SetOwnerID(a.cli.Owner()) // should be various: from sdk / BearerToken raw.SetAttributes(attributes...) + // tries to put file into NeoFS or throw error if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(tmp)); err != nil { log.Error("could not store file in NeoFS", zap.Error(err)) c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) return - } else if err = newPutResponse(addr).Encode(c); err != nil { + } + + // tries to return response, otherwise, if something went wrong throw error + if err = newPutResponse(addr).Encode(c); err != nil { log.Error("could not prepare response", zap.Error(err)) c.Error("could not prepare response", fasthttp.StatusBadRequest) return } + // reports status code and content type c.Response.SetStatusCode(fasthttp.StatusOK) c.Response.Header.SetContentType(jsonHeader) } From 71999a796d675ed29a1497f17633d0c503594cb4 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Thu, 28 Jan 2021 16:48:45 +0300 Subject: [PATCH 06/10] Fix logic with set attributes if not set from header - if NOT set Filename from header - set from multipart/form - if NOT set Timestamp from header and enabled by settings - set current timestamp Signed-off-by: Evgeniy Kulikov --- upload.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/upload.go b/upload.go index 6d72a61..12111b0 100644 --- a/upload.go +++ b/upload.go @@ -116,7 +116,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { } // sets FileName attribute if it wasn't set from header - if _, ok := filtered[object.AttributeFileName]; ok { + if _, ok := filtered[object.AttributeFileName]; !ok { filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) filename.SetValue(name) @@ -125,7 +125,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { } // sets Timestamp attribute if it wasn't set from header and enabled by settings - if _, ok := filtered[object.AttributeTimestamp]; ok && a.enableDefaultTimestamp { + if _, ok := filtered[object.AttributeTimestamp]; !ok && a.enableDefaultTimestamp { timestamp := object.NewAttribute() timestamp.SetKey(object.AttributeTimestamp) timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) From cbaf9e61428772fd53effd52e5a7bf7ed0abaa43 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Wed, 3 Feb 2021 16:01:30 +0300 Subject: [PATCH 07/10] Fixes after review After discussion, we decided to simplify attribute translation for now. Full translation requires support for UTF-8 values encoded according to RFC 8187/7230, this can be clarified and implemented in further PRs. 1. Remove translation tables 2. Use simple translation rules only, for now ignoring UTF-8 keys and values - `X-Attribute-NEOFS-` prefixed headers considered well-known system attributes - System Attribute key is uppercased - System Attribute key's `-` translates to `_` - `X-Attribute-` prefixed headers considered regular object attributes - Normal attribute key is a result of http header prefix trimming - Value string should be left as is, for now ``` HTTP: X-Attribute-NEOFS-Expiration-Epoch: 123 X-Attribute-FileName: cat.jpg NeoFS: __NEOFS__EXPIRATION_EPOCH: "123" FileName: "cat.jpg" ``` Signed-off-by: Evgeniy Kulikov --- README.md | 9 ------ app.go | 4 --- filter.go | 88 ++++++++++++++++++----------------------------------- settings.go | 10 ------ upload.go | 2 +- 5 files changed, 30 insertions(+), 83 deletions(-) diff --git a/README.md b/README.md index d9f875b..f4b30d4 100644 --- a/README.md +++ b/README.md @@ -77,13 +77,4 @@ Peers preset: HTTP_GW_PEERS_[N]_ADDRESS = string HTTP_GW_PEERS_[N]_WEIGHT = 0..1 (float) - -Upload Header Table: - -HTTP_GW_UPLOADER_HEADER_[N]_KEY = string - HTTP Header attribute name prefixed with `X-Attribute-` -HTTP_GW_UPLOADER_HEADER_[N]_VAL = string - NeoFS Object attribute mapping - -# By default we had next headers: -- FileName - to set object filename attribute -- Timestamp - to set object timestamp attribute ``` \ No newline at end of file diff --git a/app.go b/app.go index 6d666eb..ca3d249 100644 --- a/app.go +++ b/app.go @@ -26,8 +26,6 @@ type ( cfg *viper.Viper key *ecdsa.PrivateKey - hdr HeaderFilter - wlog logger.Logger web *fasthttp.Server @@ -78,8 +76,6 @@ func newApp(ctx context.Context, opt ...Option) App { opt[i](a) } - a.hdr = newHeaderFilter(a.log, a.cfg) - a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) a.wlog = logger.GRPC(a.log) diff --git a/filter.go b/filter.go index 7e13766..7d8b24d 100644 --- a/filter.go +++ b/filter.go @@ -2,63 +2,33 @@ package main import ( "bytes" - "strconv" - "strings" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" ) -type ( - HeaderFilter interface { - Filter(header *fasthttp.RequestHeader) map[string]string - } +const ( + userAttributeHeaderPrefix = "X-Attribute-" + neofsAttributeHeaderPrefix = "NEOFS-" - headerFilter struct { - logger *zap.Logger - mapping map[string]string - } + systemAttributePrefix = "__NEOFS__" ) -const userAttributeHeaderPrefix = "X-Attribute-" +func systemTranslator(key []byte) []byte { + // replace `NEOFS-` with `__NEOFS__` + key = bytes.Replace(key, []byte(neofsAttributeHeaderPrefix), []byte(systemAttributePrefix), 1) -func newHeaderFilter(l *zap.Logger, v *viper.Viper) HeaderFilter { - filter := &headerFilter{ - logger: l, - mapping: make(map[string]string), - } + // replace `-` with `_` + key = bytes.ReplaceAll(key, []byte("-"), []byte("_")) - for i := 0; ; i++ { - index := strconv.Itoa(i) - key := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderKey}, ".") - rep := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderVal}, ".") - - keyValue := v.GetString(key) - repValue := v.GetString(rep) - - if keyValue == "" || repValue == "" { - break - } - - filter.mapping[keyValue] = repValue - - l.Debug("load upload header table value", - zap.String("key", keyValue), - zap.String("val", repValue)) - } - - // Default values - filter.mapping[object.AttributeFileName] = object.AttributeFileName - filter.mapping[object.AttributeTimestamp] = object.AttributeTimestamp - - return filter + // replace with uppercase + return bytes.ToUpper(key) } -func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string { +func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]string { result := make(map[string]string) prefix := []byte(userAttributeHeaderPrefix) + system := []byte(neofsAttributeHeaderPrefix) header.VisitAll(func(key, val []byte) { // checks that key and val not empty @@ -71,27 +41,27 @@ func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string return } - // checks that after removing attribute prefix we had not empty key - if key = bytes.TrimPrefix(key, prefix); len(key) == 0 { + // removing attribute prefix + key = bytes.TrimPrefix(key, prefix) + + // checks that it's a system NeoFS header + if bytes.HasPrefix(key, system) { + key = systemTranslator(key) + } + + // checks that attribute key not empty + if len(key) == 0 { return } - // checks mapping table and if we found record store it - // at resulting hashmap - if name, ok := h.mapping[string(key)]; ok { - result[name] = string(val) + // make string representation of key / val + k, v := string(key), string(val) - h.logger.Debug("add attribute to result object", - zap.String("key", name), - zap.String("val", string(val))) + result[k] = v - return - } - - // otherwise inform that attribute will be ignored - h.logger.Debug("ignore attribute", - zap.String("key", string(key)), - zap.String("val", string(val))) + l.Debug("add attribute to result object", + zap.String("key", k), + zap.String("val", v)) }) return result diff --git a/settings.go b/settings.go index 1775569..02fcde8 100644 --- a/settings.go +++ b/settings.go @@ -54,9 +54,6 @@ const ( cfgLoggerSamplingThereafter = "logger.sampling.thereafter" // Uploader Header - cfgUploaderHeader = "uploader_header" - cfgUploaderHeaderKey = "key" - cfgUploaderHeaderVal = "val" cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp" // Peers @@ -187,13 +184,6 @@ func settings() *viper.Viper { fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers)) fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", Prefix, strings.ToUpper(cfgPeers)) - fmt.Println() - fmt.Println("Upload Header Table:") - fmt.Println() - - fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderKey)) - fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderVal)) - os.Exit(0) case version != nil && *version: fmt.Printf("NeoFS HTTP Gateway %s (%s)\n", Version, Build) diff --git a/upload.go b/upload.go index 12111b0..767ee2e 100644 --- a/upload.go +++ b/upload.go @@ -103,7 +103,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { } } - filtered := a.hdr.Filter(&c.Request.Header) + filtered := filterHeaders(a.log, &c.Request.Header) attributes := make([]*object.Attribute, 0, len(filtered)) // prepares attributes from filtered headers From a428a0b1b3dd49f4bb5cd7f04d4b1b0017327c2d Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Fri, 12 Feb 2021 15:24:52 +0300 Subject: [PATCH 08/10] fix conflicts --- README.md | 9 +++++++++ app.go | 11 +++++++++++ settings.go | 37 +++++++++++++++++++++++++++++++------ upload.go | 2 +- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f4b30d4..73aaeda 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,15 @@ If false, when there are no active RPCs, Time and Timeout will be ignored and no HTTP_GW_UPLOAD_HEADER_USE_DEFAULT_TIMESTAMP=bool - enable/disable adding current timestamp attribute when object uploads +HTTP_GW_WEB_READ_BUFFER_SIZE=4096 - per-connection buffer size for requests' reading +HTTP_GW_WEB_READ_TIMEOUT=15s - an amount of time allowed to read the full request including body +HTTP_GW_WEB_WRITE_BUFFER_SIZE=4096 - per-connection buffer size for responses' writing +HTTP_GW_WEB_WRITE_TIMEOUT=1m0s - maximum duration before timing out writes of the response +HTTP_GW_WEB_STREAM_REQUEST_BODY=true - enables request body streaming, and calls the handler sooner when given + body is larger then the current limit +HTTP_GW_WEB_MAX_REQUEST_BODY_SIZE=4194304 - maximum request body size, server rejects requests with bodies exceeding + this limit + Peers preset: HTTP_GW_PEERS_[N]_ADDRESS = string diff --git a/app.go b/app.go index ca3d249..34a1492 100644 --- a/app.go +++ b/app.go @@ -97,6 +97,17 @@ func newApp(ctx context.Context, opt ...Option) App { a.web.DisableHeaderNamesNormalizing = true a.web.NoDefaultServerHeader = true a.web.NoDefaultContentType = true + a.web.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize) + + // FIXME don't work with StreamRequestBody, + // some bugs with readMultipartForm + // a.web.DisablePreParseMultipartForm = true + + // body streaming + // TODO should be replaced in future with + // + // a.web.StreamRequestBody = v.GetBool(cfgWebStreamRequestBody) + checkAndEnableStreaming(a.log, a.cfg, a.web) // -- -- -- -- -- -- -- -- -- -- connections := make(map[string]float64) diff --git a/settings.go b/settings.go index 02fcde8..b2dac09 100644 --- a/settings.go +++ b/settings.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "reflect" "sort" "strconv" "strings" @@ -11,6 +12,8 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" + "github.com/valyala/fasthttp" + "go.uber.org/zap" ) type empty int @@ -33,11 +36,12 @@ const ( cfgKeepalivePermitWithoutStream = "keepalive.permit_without_stream" // Web - cfgWebReadBufferSize = "web.read_buffer_size" - cfgWebWriteBufferSize = "web.write_buffer_size" - cfgWebReadTimeout = "web.read_timeout" - cfgWebWriteTimeout = "web.write_timeout" - cfgWebConnectionPerHost = "web.connection_per_host" + cfgWebReadBufferSize = "web.read_buffer_size" + cfgWebWriteBufferSize = "web.write_buffer_size" + cfgWebReadTimeout = "web.read_timeout" + cfgWebWriteTimeout = "web.write_timeout" + cfgWebStreamRequestBody = "web.stream_request_body" + cfgWebMaxRequestBodySize = "web.max_request_body_size" // Timeouts cfgConTimeout = "connect_timeout" @@ -86,6 +90,26 @@ var ignore = map[string]struct{}{ func (empty) Read([]byte) (int, error) { return 0, io.EOF } +// checkAndEnableStreaming is temporary shim, should be used before +// `StreamRequestBody` is not merged in fasthttp master +// TODO should be removed in future +func checkAndEnableStreaming(l *zap.Logger, v *viper.Viper, i interface{}) { + vi := reflect.ValueOf(i) + + if vi.Type().Kind() != reflect.Ptr { + return + } + + field := vi.Elem().FieldByName("StreamRequestBody") + if !field.IsValid() || field.Kind() != reflect.Bool { + l.Warn("stream request body not supported") + + return + } + + field.SetBool(v.GetBool(cfgWebStreamRequestBody)) +} + func settings() *viper.Viper { v := viper.New() v.AutomaticEnv() @@ -140,7 +164,8 @@ func settings() *viper.Viper { v.SetDefault(cfgWebWriteBufferSize, 4096) v.SetDefault(cfgWebReadTimeout, time.Second*15) v.SetDefault(cfgWebWriteTimeout, time.Minute) - v.SetDefault(cfgWebConnectionPerHost, 10) + v.SetDefault(cfgWebStreamRequestBody, true) + v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) // upload header v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) diff --git a/upload.go b/upload.go index 767ee2e..38d53e9 100644 --- a/upload.go +++ b/upload.go @@ -55,7 +55,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { defer func() { // if temporary reader can be closed - close it - if closer := tmp.(io.Closer); closer != nil { + if closer, ok := tmp.(io.Closer); ok && closer != nil { log.Debug("close temporary multipart/form file", zap.Error(closer.Close())) } From 3b8bf3017d823472f4dad2edf9e6e5c246a8081f Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Fri, 12 Feb 2021 18:48:52 +0300 Subject: [PATCH 09/10] Migrate to fasthttp v1.20.0 Signed-off-by: Evgeniy Kulikov --- app.go | 7 ++----- settings.go | 22 ---------------------- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/app.go b/app.go index 34a1492..8e4a961 100644 --- a/app.go +++ b/app.go @@ -101,13 +101,10 @@ func newApp(ctx context.Context, opt ...Option) App { // FIXME don't work with StreamRequestBody, // some bugs with readMultipartForm + // https://github.com/valyala/fasthttp/issues/968 // a.web.DisablePreParseMultipartForm = true - // body streaming - // TODO should be replaced in future with - // - // a.web.StreamRequestBody = v.GetBool(cfgWebStreamRequestBody) - checkAndEnableStreaming(a.log, a.cfg, a.web) + a.web.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) // -- -- -- -- -- -- -- -- -- -- connections := make(map[string]float64) diff --git a/settings.go b/settings.go index b2dac09..206fd0a 100644 --- a/settings.go +++ b/settings.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "os" - "reflect" "sort" "strconv" "strings" @@ -13,7 +12,6 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/valyala/fasthttp" - "go.uber.org/zap" ) type empty int @@ -90,26 +88,6 @@ var ignore = map[string]struct{}{ func (empty) Read([]byte) (int, error) { return 0, io.EOF } -// checkAndEnableStreaming is temporary shim, should be used before -// `StreamRequestBody` is not merged in fasthttp master -// TODO should be removed in future -func checkAndEnableStreaming(l *zap.Logger, v *viper.Viper, i interface{}) { - vi := reflect.ValueOf(i) - - if vi.Type().Kind() != reflect.Ptr { - return - } - - field := vi.Elem().FieldByName("StreamRequestBody") - if !field.IsValid() || field.Kind() != reflect.Bool { - l.Warn("stream request body not supported") - - return - } - - field.SetBool(v.GetBool(cfgWebStreamRequestBody)) -} - func settings() *viper.Viper { v := viper.New() v.AutomaticEnv() From 3f635a018aa043ba465a52b9c2a9a009b1285914 Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Sat, 13 Feb 2021 19:17:01 +0300 Subject: [PATCH 10/10] Refactoring uploading - DisablePreParseMultipartForm = true - used `fetchMultipartFile` method instead of `MultipartForm` - don't store temporary files, only streaming Signed-off-by: Evgeniy Kulikov --- app.go | 2 +- multipart.go | 41 ++++++++++++++++++++++++++++++++++++++++ upload.go | 53 ++++++++++------------------------------------------ 3 files changed, 52 insertions(+), 44 deletions(-) create mode 100644 multipart.go diff --git a/app.go b/app.go index 8e4a961..6c70b90 100644 --- a/app.go +++ b/app.go @@ -102,7 +102,7 @@ func newApp(ctx context.Context, opt ...Option) App { // FIXME don't work with StreamRequestBody, // some bugs with readMultipartForm // https://github.com/valyala/fasthttp/issues/968 - // a.web.DisablePreParseMultipartForm = true + a.web.DisablePreParseMultipartForm = true a.web.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) // -- -- -- -- -- -- -- -- -- -- diff --git a/multipart.go b/multipart.go new file mode 100644 index 0000000..f12442f --- /dev/null +++ b/multipart.go @@ -0,0 +1,41 @@ +package main + +import ( + "io" + "mime/multipart" + + "go.uber.org/zap" +) + +type MultipartFile interface { + io.ReadCloser + FileName() string +} + +func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartFile, error) { + reader := multipart.NewReader(r, boundary) + + for { + part, err := reader.NextPart() + if err != nil { + return nil, err + } + + name := part.FormName() + if name == "" { + l.Debug("ignore part, empty form name") + continue + } + + filename := part.FileName() + + // ignore multipart/form-data values + if filename == "" { + l.Debug("ignore part, empty filename", zap.String("form", name)) + + continue + } + + return part, nil + } +} diff --git a/upload.go b/upload.go index 38d53e9..0bf99b2 100644 --- a/upload.go +++ b/upload.go @@ -3,7 +3,6 @@ package main import ( "encoding/json" "io" - "mime/multipart" "strconv" "time" @@ -37,10 +36,8 @@ func (pr *putResponse) Encode(w io.Writer) error { func (a *app) upload(c *fasthttp.RequestCtx) { var ( err error - name string - tmp io.Reader + file MultipartFile addr *object.Address - form *multipart.Form cid = container.NewID() sCID, _ = c.UserValue("cid").(string) @@ -55,54 +52,24 @@ func (a *app) upload(c *fasthttp.RequestCtx) { defer func() { // if temporary reader can be closed - close it - if closer, ok := tmp.(io.Closer); ok && closer != nil { - log.Debug("close temporary multipart/form file", zap.Error(closer.Close())) - } - - if form == nil { + if file == nil { return } - log.Debug("cleanup multipart/form", zap.Error(form.RemoveAll())) + log.Debug("close temporary multipart/form file", + zap.Stringer("address", addr), + zap.String("filename", file.FileName()), + zap.Error(file.Close())) }() - // tries to receive multipart/form or throw error - if form, err = c.MultipartForm(); err != nil { + boundary := string(c.Request.Header.MultipartFormBoundary()) + if file, err = fetchMultipartFile(a.log, c.RequestBodyStream(), boundary); err != nil { log.Error("could not receive multipart/form", zap.Error(err)) c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } - // checks that received multipart/form contains only one `file` per request - if ln := len(form.File); ln != 1 { - log.Error("received multipart/form with more then one file", zap.Int("count", ln)) - c.Error("received multipart/form with more then one file", fasthttp.StatusBadRequest) - - return - } - - for _, file := range form.File { - // because multipart/form can contains multiple FileHeader records - // we should check that we have only one per request or throw error - if ln := len(file); ln != 1 { - log.Error("received multipart/form file should contains one record", zap.Int("count", ln)) - c.Error("received multipart/form file should contains one record", fasthttp.StatusBadRequest) - - return - } - - name = file[0].Filename - - // opens multipart/form file to work within or throw error - if tmp, err = file[0].Open(); err != nil { - log.Error("could not prepare uploaded file", zap.Error(err)) - c.Error("could not prepare uploaded file", fasthttp.StatusBadRequest) - - return - } - } - filtered := filterHeaders(a.log, &c.Request.Header) attributes := make([]*object.Attribute, 0, len(filtered)) @@ -119,7 +86,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { if _, ok := filtered[object.AttributeFileName]; !ok { filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) - filename.SetValue(name) + filename.SetValue(file.FileName()) attributes = append(attributes, filename) } @@ -140,7 +107,7 @@ func (a *app) upload(c *fasthttp.RequestCtx) { raw.SetAttributes(attributes...) // tries to put file into NeoFS or throw error - if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(tmp)); err != nil { + if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil { log.Error("could not store file in NeoFS", zap.Error(err)) c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)