From 462de45e12c0b731e7cce4d031350cab1f0e26de Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Mon, 25 Jan 2021 22:36:46 +0300 Subject: [PATCH] Added POST method to upload files into NeoFS through HTTP Gate - Updated README - Added method to upload files into NeoFS - HTTP Upload Header Filter loaded from settings - Added `HeaderFilter` that filters headers (object attributes) Signed-off-by: Evgeniy Kulikov --- README.md | 5 ++ app.go | 8 ++- filter.go | 86 +++++++++++++++++++++++++++++++ settings.go | 15 +++++- upload.go | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 257 insertions(+), 3 deletions(-) create mode 100644 filter.go create mode 100644 upload.go diff --git a/README.md b/README.md index 9d704f3..c4652ed 100644 --- a/README.md +++ b/README.md @@ -68,4 +68,9 @@ Peers preset: HTTP_GW_PEERS_[N]_ADDRESS = string HTTP_GW_PEERS_[N]_WEIGHT = 0..1 (float) + +Upload Header Table: + +HTTP_GW_UPLOADER_HEADER_[N]_KEY = string - HTTP Header attribute name (except `X-Attribute-`) +HTTP_GW_UPLOADER_HEADER_[N]_VAL = string - NeoFS Object attribute mapping ``` \ No newline at end of file diff --git a/app.go b/app.go index 4b56ac3..57fff2e 100644 --- a/app.go +++ b/app.go @@ -26,6 +26,8 @@ type ( cfg *viper.Viper key *ecdsa.PrivateKey + hdr HeaderFilter + wlog logger.Logger web *fasthttp.Server @@ -74,6 +76,8 @@ func newApp(ctx context.Context, opt ...Option) App { opt[i](a) } + a.hdr = newHeaderFilter(a.log, a.cfg) + a.wlog = logger.GRPC(a.log) if a.cfg.GetBool(cmdVerbose) { @@ -90,7 +94,6 @@ func newApp(ctx context.Context, opt ...Option) App { a.web.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize) a.web.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout) a.web.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout) - a.web.GetOnly = true a.web.DisableHeaderNamesNormalizing = true a.web.NoDefaultServerHeader = true a.web.NoDefaultContentType = true @@ -174,6 +177,9 @@ func (a *app) Serve(ctx context.Context) { r := router.New() r.RedirectTrailingSlash = true + a.log.Info("enabled /put/{cid}") + r.POST("/put/{cid}", a.upload) + a.log.Info("enabled /get/{cid}/{oid}") r.GET("/get/{cid}/{oid}", a.byAddress) diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..9c34015 --- /dev/null +++ b/filter.go @@ -0,0 +1,86 @@ +package main + +import ( + "bytes" + "strconv" + "strings" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/spf13/viper" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type ( + HeaderFilter interface { + Filter(header *fasthttp.RequestHeader) map[string]string + } + + headerFilter struct { + logger *zap.Logger + mapping map[string]string + } +) + +const userAttributeHeader = "X-Attribute-" + +func newHeaderFilter(l *zap.Logger, v *viper.Viper) HeaderFilter { + filter := &headerFilter{ + logger: l, + mapping: make(map[string]string), + } + + for i := 0; ; i++ { + index := strconv.Itoa(i) + key := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderKey}, ".") + rep := strings.Join([]string{cfgUploaderHeader, index, cfgUploaderHeaderVal}, ".") + + keyValue := v.GetString(key) + repValue := v.GetString(rep) + + if keyValue == "" || repValue == "" { + break + } + + filter.mapping[keyValue] = repValue + + l.Debug("load upload header table value", + zap.String("key", keyValue), + zap.String("val", repValue)) + } + + // Default values + filter.mapping[object.AttributeFileName] = object.AttributeFileName + filter.mapping[object.AttributeTimestamp] = object.AttributeTimestamp + + return filter +} + +func (h *headerFilter) Filter(header *fasthttp.RequestHeader) map[string]string { + result := make(map[string]string) + prefix := []byte(userAttributeHeader) + + header.VisitAll(func(key, val []byte) { + if len(key) == 0 || len(val) == 0 { + return + } else if !bytes.HasPrefix(key, prefix) { + return + } else if key = bytes.TrimPrefix(key, prefix); len(key) == 0 { + return + } else if name, ok := h.mapping[string(key)]; ok { + result[name] = string(val) + + h.logger.Debug("add attribute to result object", + zap.String("key", name), + zap.String("val", string(val))) + + return + } + + h.logger.Debug("ignore attribute", + zap.String("key", string(key)), + zap.String("val", string(val))) + }) + + return result +} diff --git a/settings.go b/settings.go index c781056..2917a18 100644 --- a/settings.go +++ b/settings.go @@ -16,8 +16,7 @@ import ( type empty int const ( - devNull = empty(0) - generated = "generated" + devNull = empty(0) defaultRebalanceTimer = 15 * time.Second defaultRequestTimeout = 15 * time.Second @@ -54,6 +53,11 @@ const ( cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" + // Uploader Header + cfgUploaderHeader = "uploader_header" + cfgUploaderHeaderKey = "key" + cfgUploaderHeaderVal = "val" + // Peers cfgPeers = "peers" @@ -179,6 +183,13 @@ func settings() *viper.Viper { fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers)) fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", Prefix, strings.ToUpper(cfgPeers)) + fmt.Println() + fmt.Println("Upload Header Table:") + fmt.Println() + + fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderKey)) + fmt.Printf("%s_%s_[N]_%s = string\n", Prefix, strings.ToUpper(cfgUploaderHeader), strings.ToUpper(cfgUploaderHeaderVal)) + os.Exit(0) case version != nil && *version: fmt.Printf("NeoFS HTTP Gateway %s (%s)\n", Version, Build) diff --git a/upload.go b/upload.go new file mode 100644 index 0000000..128c3eb --- /dev/null +++ b/upload.go @@ -0,0 +1,146 @@ +package main + +import ( + "encoding/json" + "io" + "io/ioutil" + "mime/multipart" + "os" + "strconv" + "time" + + sdk "github.com/nspcc-dev/cdn-sdk" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type putResponse struct { + OID string `json:"object_id"` + CID string `json:"container_id"` +} + +func newPutResponse(addr *object.Address) *putResponse { + return &putResponse{ + OID: addr.ObjectID().String(), + CID: addr.ContainerID().String(), + } +} + +func (pr *putResponse) Encode(w io.Writer) error { + enc := json.NewEncoder(w) + enc.SetIndent("", "\t") + return enc.Encode(pr) +} + +func (a *app) upload(c *fasthttp.RequestCtx) { + var ( + err error + name string + tmp *os.File + addr *object.Address + form *multipart.Form + cid = container.NewID() + sCID, _ = c.UserValue("cid").(string) + + log = a.log.With(zap.String("cid", sCID)) + ) + + if err = cid.Parse(sCID); err != nil { + log.Error("wrong container id", zap.Error(err)) + c.Error("wrong container id", fasthttp.StatusBadRequest) + return + } + + if tmp, err = ioutil.TempFile("", "http-gate-upload-*"); err != nil { + log.Error("could not prepare temporary file", zap.Error(err)) + c.Error("could not prepare temporary file", fasthttp.StatusBadRequest) + return + } + + defer func() { + tmpName := tmp.Name() + + log.Debug("close temporary file", zap.Error(tmp.Close())) + log.Debug("remove temporary file", zap.Error(os.RemoveAll(tmpName))) + }() + + if form, err = c.MultipartForm(); err != nil { + log.Error("could not receive multipart form", zap.Error(err)) + c.Error("could not receive multipart form: "+err.Error(), fasthttp.StatusBadRequest) + + return + } else if ln := len(form.File); ln != 1 { + log.Error("received multipart form with more then one file", zap.Int("count", ln)) + c.Error("received multipart form with more then one file", fasthttp.StatusBadRequest) + + return + } + + for _, file := range form.File { + if ln := len(file); ln != 1 { + log.Error("received multipart form file should contains one record", zap.Int("count", ln)) + c.Error("received multipart form file should contains one record", fasthttp.StatusBadRequest) + + return + } + + name = file[0].Filename + + if err = fasthttp.SaveMultipartFile(file[0], tmp.Name()); err != nil { + log.Error("could not store uploaded file into temporary", zap.Error(err)) + c.Error("could not store uploaded file into temporary", fasthttp.StatusBadRequest) + + return + } + } + + filtered := a.hdr.Filter(&c.Request.Header) + attributes := make([]*object.Attribute, 0, len(filtered)) + + for key, val := range filtered { + attribute := object.NewAttribute() + attribute.SetKey(key) + attribute.SetValue(val) + + attributes = append(attributes, attribute) + } + + // Attribute FileName wasn't set from header + if _, ok := filtered[object.AttributeFileName]; ok { + filename := object.NewAttribute() + filename.SetKey(object.AttributeFileName) + filename.SetValue(name) + + attributes = append(attributes, filename) + } + + // Attribute Timestamp wasn't set from header + if _, ok := filtered[object.AttributeTimestamp]; ok { + timestamp := object.NewAttribute() + timestamp.SetKey(object.AttributeTimestamp) + timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) + + attributes = append(attributes, timestamp) + } + + raw := object.NewRaw() + raw.SetContainerID(cid) + raw.SetOwnerID(a.cli.Owner()) // should be various: from sdk / BearerToken + raw.SetAttributes(attributes...) + + if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(tmp)); err != nil { + log.Error("could not store file in NeoFS", zap.Error(err)) + c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) + + return + } else if err = newPutResponse(addr).Encode(c); err != nil { + log.Error("could not prepare response", zap.Error(err)) + c.Error("could not prepare response", fasthttp.StatusBadRequest) + + return + } + + c.Response.SetStatusCode(fasthttp.StatusOK) +}