Merge pull request #14 from nspcc-dev/add-post-method-to-upload-files

Added POST method to upload files into NeoFS through HTTP Gate
This commit is contained in:
Evgeniy Kulikov 2021-02-15 15:48:37 +03:00 committed by GitHub
commit 86991baa33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 287 additions and 10 deletions

View file

@ -1,7 +1,9 @@
# NeoFS HTTP Gate # NeoFS HTTP Gate
NeoFS HTTP Gate is example of tool that provides basic interactions with NeoFS. NeoFS HTTP Gate is example of tool that provides basic interactions with NeoFS.
You can download files from NeoFS Network using NeoFS Gate.
- you can download one file per request from NeoFS Network using NeoFS Gate
- you can upload one file per request into NeoFS Network using NeoFS Gate
## Notable make targets ## Notable make targets
@ -24,6 +26,11 @@ You can download files from NeoFS Network using NeoFS Gate.
```go get -u github.com/nspcc-dev/neofs-http-gate``` ```go get -u github.com/nspcc-dev/neofs-http-gate```
## File uploading behaviors
- you can upload on file per request
- if `FileName` not provided by Header attributes, multipart/form filename will be used instead
## Configuration ## Configuration
``` ```
@ -64,6 +71,17 @@ of Timeout and if no activity is seen even after that the connection is closed
HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=Bool - if true, client sends keepalive pings even with no active RPCs. HTTP_GW_KEEPALIVE_PERMIT_WITHOUT_STREAM=Bool - if true, client sends keepalive pings even with no active RPCs.
If false, when there are no active RPCs, Time and Timeout will be ignored and no keepalive pings will be sent. If false, when there are no active RPCs, Time and Timeout will be ignored and no keepalive pings will be sent.
HTTP_GW_UPLOAD_HEADER_USE_DEFAULT_TIMESTAMP=bool - enable/disable adding current timestamp attribute when object uploads
HTTP_GW_WEB_READ_BUFFER_SIZE=4096 - per-connection buffer size for requests' reading
HTTP_GW_WEB_READ_TIMEOUT=15s - an amount of time allowed to read the full request including body
HTTP_GW_WEB_WRITE_BUFFER_SIZE=4096 - per-connection buffer size for responses' writing
HTTP_GW_WEB_WRITE_TIMEOUT=1m0s - maximum duration before timing out writes of the response
HTTP_GW_WEB_STREAM_REQUEST_BODY=true - enables request body streaming, and calls the handler sooner when given
body is larger then the current limit
HTTP_GW_WEB_MAX_REQUEST_BODY_SIZE=4194304 - maximum request body size, server rejects requests with bodies exceeding
this limit
Peers preset: Peers preset:
HTTP_GW_PEERS_[N]_ADDRESS = string HTTP_GW_PEERS_[N]_ADDRESS = string

16
app.go
View file

@ -31,6 +31,8 @@ type (
jobDone chan struct{} jobDone chan struct{}
webDone chan struct{} webDone chan struct{}
enableDefaultTimestamp bool
} }
App interface { App interface {
@ -74,6 +76,8 @@ func newApp(ctx context.Context, opt ...Option) App {
opt[i](a) opt[i](a)
} }
a.enableDefaultTimestamp = a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
a.wlog = logger.GRPC(a.log) a.wlog = logger.GRPC(a.log)
if a.cfg.GetBool(cmdVerbose) { if a.cfg.GetBool(cmdVerbose) {
@ -90,10 +94,17 @@ func newApp(ctx context.Context, opt ...Option) App {
a.web.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize) a.web.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize)
a.web.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout) a.web.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
a.web.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout) a.web.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
a.web.GetOnly = true
a.web.DisableHeaderNamesNormalizing = true a.web.DisableHeaderNamesNormalizing = true
a.web.NoDefaultServerHeader = true a.web.NoDefaultServerHeader = true
a.web.NoDefaultContentType = true a.web.NoDefaultContentType = true
a.web.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
// FIXME don't work with StreamRequestBody,
// some bugs with readMultipartForm
// https://github.com/valyala/fasthttp/issues/968
a.web.DisablePreParseMultipartForm = true
a.web.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- // -- -- -- -- -- -- -- -- -- --
connections := make(map[string]float64) connections := make(map[string]float64)
@ -174,6 +185,9 @@ func (a *app) Serve(ctx context.Context) {
r := router.New() r := router.New()
r.RedirectTrailingSlash = true r.RedirectTrailingSlash = true
a.log.Info("enabled /upload/{cid}")
r.POST("/upload/{cid}", a.upload)
a.log.Info("enabled /get/{cid}/{oid}") a.log.Info("enabled /get/{cid}/{oid}")
r.GET("/get/{cid}/{oid}", a.byAddress) r.GET("/get/{cid}/{oid}", a.byAddress)

68
filter.go Normal file
View file

@ -0,0 +1,68 @@
package main
import (
"bytes"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
const (
userAttributeHeaderPrefix = "X-Attribute-"
neofsAttributeHeaderPrefix = "NEOFS-"
systemAttributePrefix = "__NEOFS__"
)
func systemTranslator(key []byte) []byte {
// replace `NEOFS-` with `__NEOFS__`
key = bytes.Replace(key, []byte(neofsAttributeHeaderPrefix), []byte(systemAttributePrefix), 1)
// replace `-` with `_`
key = bytes.ReplaceAll(key, []byte("-"), []byte("_"))
// replace with uppercase
return bytes.ToUpper(key)
}
func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) map[string]string {
result := make(map[string]string)
prefix := []byte(userAttributeHeaderPrefix)
system := []byte(neofsAttributeHeaderPrefix)
header.VisitAll(func(key, val []byte) {
// checks that key and val not empty
if len(key) == 0 || len(val) == 0 {
return
}
// checks that key has attribute prefix
if !bytes.HasPrefix(key, prefix) {
return
}
// removing attribute prefix
key = bytes.TrimPrefix(key, prefix)
// checks that it's a system NeoFS header
if bytes.HasPrefix(key, system) {
key = systemTranslator(key)
}
// checks that attribute key not empty
if len(key) == 0 {
return
}
// make string representation of key / val
k, v := string(key), string(val)
result[k] = v
l.Debug("add attribute to result object",
zap.String("key", k),
zap.String("val", v))
})
return result
}

41
multipart.go Normal file
View file

@ -0,0 +1,41 @@
package main
import (
"io"
"mime/multipart"
"go.uber.org/zap"
)
type MultipartFile interface {
io.ReadCloser
FileName() string
}
func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartFile, error) {
reader := multipart.NewReader(r, boundary)
for {
part, err := reader.NextPart()
if err != nil {
return nil, err
}
name := part.FormName()
if name == "" {
l.Debug("ignore part, empty form name")
continue
}
filename := part.FileName()
// ignore multipart/form-data values
if filename == "" {
l.Debug("ignore part, empty filename", zap.String("form", name))
continue
}
return part, nil
}
}

View file

@ -11,13 +11,13 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/valyala/fasthttp"
) )
type empty int type empty int
const ( const (
devNull = empty(0) devNull = empty(0)
generated = "generated"
defaultRebalanceTimer = 15 * time.Second defaultRebalanceTimer = 15 * time.Second
defaultRequestTimeout = 15 * time.Second defaultRequestTimeout = 15 * time.Second
@ -34,11 +34,12 @@ const (
cfgKeepalivePermitWithoutStream = "keepalive.permit_without_stream" cfgKeepalivePermitWithoutStream = "keepalive.permit_without_stream"
// Web // Web
cfgWebReadBufferSize = "web.read_buffer_size" cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size"
cfgWebReadTimeout = "web.read_timeout" cfgWebReadTimeout = "web.read_timeout"
cfgWebWriteTimeout = "web.write_timeout" cfgWebWriteTimeout = "web.write_timeout"
cfgWebConnectionPerHost = "web.connection_per_host" cfgWebStreamRequestBody = "web.stream_request_body"
cfgWebMaxRequestBodySize = "web.max_request_body_size"
// Timeouts // Timeouts
cfgConTimeout = "connect_timeout" cfgConTimeout = "connect_timeout"
@ -54,6 +55,9 @@ const (
cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingInitial = "logger.sampling.initial"
cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
// Uploader Header
cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp"
// Peers // Peers
cfgPeers = "peers" cfgPeers = "peers"
@ -138,7 +142,11 @@ func settings() *viper.Viper {
v.SetDefault(cfgWebWriteBufferSize, 4096) v.SetDefault(cfgWebWriteBufferSize, 4096)
v.SetDefault(cfgWebReadTimeout, time.Second*15) v.SetDefault(cfgWebReadTimeout, time.Second*15)
v.SetDefault(cfgWebWriteTimeout, time.Minute) v.SetDefault(cfgWebWriteTimeout, time.Minute)
v.SetDefault(cfgWebConnectionPerHost, 10) v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
// upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
if err := v.BindPFlags(flags); err != nil { if err := v.BindPFlags(flags); err != nil {
panic(err) panic(err)

128
upload.go Normal file
View file

@ -0,0 +1,128 @@
package main
import (
"encoding/json"
"io"
"strconv"
"time"
sdk "github.com/nspcc-dev/cdn-sdk"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
type putResponse struct {
OID string `json:"object_id"`
CID string `json:"container_id"`
}
const jsonHeader = "application/json; charset=UTF-8"
func newPutResponse(addr *object.Address) *putResponse {
return &putResponse{
OID: addr.ObjectID().String(),
CID: addr.ContainerID().String(),
}
}
func (pr *putResponse) Encode(w io.Writer) error {
enc := json.NewEncoder(w)
enc.SetIndent("", "\t")
return enc.Encode(pr)
}
func (a *app) upload(c *fasthttp.RequestCtx) {
var (
err error
file MultipartFile
addr *object.Address
cid = container.NewID()
sCID, _ = c.UserValue("cid").(string)
log = a.log.With(zap.String("cid", sCID))
)
if err = cid.Parse(sCID); err != nil {
log.Error("wrong container id", zap.Error(err))
c.Error("wrong container id", fasthttp.StatusBadRequest)
return
}
defer func() {
// if temporary reader can be closed - close it
if file == nil {
return
}
log.Debug("close temporary multipart/form file",
zap.Stringer("address", addr),
zap.String("filename", file.FileName()),
zap.Error(file.Close()))
}()
boundary := string(c.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(a.log, c.RequestBodyStream(), boundary); err != nil {
log.Error("could not receive multipart/form", zap.Error(err))
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
return
}
filtered := filterHeaders(a.log, &c.Request.Header)
attributes := make([]*object.Attribute, 0, len(filtered))
// prepares attributes from filtered headers
for key, val := range filtered {
attribute := object.NewAttribute()
attribute.SetKey(key)
attribute.SetValue(val)
attributes = append(attributes, attribute)
}
// sets FileName attribute if it wasn't set from header
if _, ok := filtered[object.AttributeFileName]; !ok {
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(file.FileName())
attributes = append(attributes, filename)
}
// sets Timestamp attribute if it wasn't set from header and enabled by settings
if _, ok := filtered[object.AttributeTimestamp]; !ok && a.enableDefaultTimestamp {
timestamp := object.NewAttribute()
timestamp.SetKey(object.AttributeTimestamp)
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
attributes = append(attributes, timestamp)
}
// prepares new object and fill it
raw := object.NewRaw()
raw.SetContainerID(cid)
raw.SetOwnerID(a.cli.Owner()) // should be various: from sdk / BearerToken
raw.SetAttributes(attributes...)
// tries to put file into NeoFS or throw error
if addr, err = a.cli.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil {
log.Error("could not store file in NeoFS", zap.Error(err))
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
return
}
// tries to return response, otherwise, if something went wrong throw error
if err = newPutResponse(addr).Encode(c); err != nil {
log.Error("could not prepare response", zap.Error(err))
c.Error("could not prepare response", fasthttp.StatusBadRequest)
return
}
// reports status code and content type
c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.Header.SetContentType(jsonHeader)
}