From eb92219e1440cc953d9021d0b9750d921e237fcd Mon Sep 17 00:00:00 2001 From: Pavel Korotkov Date: Wed, 31 Mar 2021 19:58:42 +0300 Subject: [PATCH] [#19] Extract uploading logic into a separate package Signed-off-by: Pavel Korotkov --- app.go | 4 +- go_dev.mod | 26 -- neofs/client-plant.go | 24 +- receive.go | 22 +- bearer.go => tokens/bearer-token.go | 26 +- bearer_test.go => tokens/bearer-token_test.go | 10 +- filter.go => uploader/filter.go | 2 +- multipart.go => uploader/multipart.go | 2 +- upload.go => uploader/upload.go | 225 +++++++++--------- 9 files changed, 153 insertions(+), 188 deletions(-) delete mode 100644 go_dev.mod rename bearer.go => tokens/bearer-token.go (84%) rename bearer_test.go => tokens/bearer-token_test.go (92%) rename filter.go => uploader/filter.go (98%) rename multipart.go => uploader/multipart.go (97%) rename upload.go => uploader/upload.go (77%) diff --git a/app.go b/app.go index 8fe8960..f1f1336 100644 --- a/app.go +++ b/app.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gate/logger" "github.com/nspcc-dev/neofs-http-gate/neofs" + "github.com/nspcc-dev/neofs-http-gate/uploader" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -151,10 +152,11 @@ func (a *app) Serve(ctx context.Context) { a.log.Info("shutting down web server", zap.Error(a.web.Shutdown())) close(a.webDone) }() + uploader := uploader.New(a.log, a.plant, a.enableDefaultTimestamp) // Configure router. r := router.New() r.RedirectTrailingSlash = true - r.POST("/upload/{cid}", a.upload) + r.POST("/upload/{cid}", uploader.Upload) a.log.Info("added path /upload/{cid}") r.GET("/get/{cid}/{oid}", a.byAddress) a.log.Info("added path /get/{cid}/{oid}") diff --git a/go_dev.mod b/go_dev.mod deleted file mode 100644 index f45f6ae..0000000 --- a/go_dev.mod +++ /dev/null @@ -1,26 +0,0 @@ -module github.com/nspcc-dev/neofs-http-gate - -go 1.16 - -require ( - github.com/fasthttp/router v0.6.1 - github.com/nspcc-dev/neofs-api v0.0.0-00000000000000-000000000000 - github.com/nspcc-dev/neofs-crypto v0.2.3 - github.com/prometheus/client_golang v1.4.1 // v1.2.1 => v1.4.1 - github.com/prometheus/common v0.9.1 - github.com/spf13/pflag v1.0.5 - github.com/spf13/viper v1.6.2 // v1.6.1 => v1.6.2 - github.com/valyala/fasthttp v1.9.0 - go.uber.org/atomic v1.6.0 - go.uber.org/zap v1.14.0 - golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 // indirect - golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect - golang.org/x/text v0.3.2 // indirect - google.golang.org/grpc v1.27.1 -) - -// For debug reasons -replace ( - github.com/nspcc-dev/neofs-api => ../neofs-api - google.golang.org/grpc => ../grpc-go -) diff --git a/neofs/client-plant.go b/neofs/client-plant.go index 5102c29..cf3aaa9 100644 --- a/neofs/client-plant.go +++ b/neofs/client-plant.go @@ -21,14 +21,17 @@ import ( const ( nodeConnectionTimeout = 10 * time.Second - maxObjectSize = uint64(1 << (20 + 6)) // 64MB + maxObjectSize = uint64(1 << 26) // 64MiB ) -type PutOptions struct { +type BaseOptions struct { Client client.Client SessionToken *token.SessionToken BearerToken *token.BearerToken - // ... +} + +type PutOptions struct { + BaseOptions ContainerID *container.ID OwnerID *owner.ID PrepareObjectOnsite bool @@ -36,19 +39,13 @@ type PutOptions struct { } type GetOptions struct { - Client client.Client - SessionToken *token.SessionToken - BearerToken *token.BearerToken - // ... + BaseOptions ObjectAddress *object.Address Writer io.Writer } type SearchOptions struct { - Client client.Client - SessionToken *token.SessionToken - BearerToken *token.BearerToken - // ... + BaseOptions ContainerID *container.ID Attribute struct { Key string @@ -57,10 +54,7 @@ type SearchOptions struct { } type DeleteOptions struct { - Client client.Client - SessionToken *token.SessionToken - BearerToken *token.BearerToken - // ... + BaseOptions ObjectAddress *object.Address } diff --git a/receive.go b/receive.go index 9728e7b..25199f8 100644 --- a/receive.go +++ b/receive.go @@ -12,6 +12,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-http-gate/neofs" + "github.com/nspcc-dev/neofs-http-gate/tokens" "github.com/pkg/errors" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -23,13 +24,11 @@ type ( detector struct { io.Writer sync.Once - contentType string } request struct { *fasthttp.RequestCtx - log *zap.Logger obj neofs.ObjectClient } @@ -45,7 +44,6 @@ func (d *detector) Write(data []byte) (int, error) { d.Once.Do(func() { d.contentType = http.DetectContentType(data) }) - return d.Writer.Write(data) } @@ -56,13 +54,12 @@ func (r *request) receiveFile(options *neofs.GetOptions) { start = time.Now() filename string ) - if err = storeBearerToken(r.RequestCtx); err != nil { + if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { r.log.Error("could not fetch and store bearer token", zap.Error(err)) r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest) return } writer := newDetector(r.Response.BodyWriter()) - // obj, err := r.obj.Get(r, address, sdk.WithGetWriter(writer)) options.Writer = writer obj, err := r.obj.Get(r.RequestCtx, options) if err != nil { @@ -84,22 +81,17 @@ func (r *request) receiveFile(options *neofs.GetOptions) { r.Error(msg, code) return } - if r.Request.URI().QueryArgs().GetBool("download") { dis = "attachment" } - r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10)) r.Response.Header.Set("x-object-id", obj.ID().String()) r.Response.Header.Set("x-owner-id", obj.OwnerID().String()) r.Response.Header.Set("x-container-id", obj.ContainerID().String()) - for _, attr := range obj.Attributes() { key := attr.Key() val := attr.Value() - r.Response.Header.Set("x-"+key, val) - switch key { case object.AttributeFileName: filename = val @@ -112,13 +104,10 @@ func (r *request) receiveFile(options *neofs.GetOptions) { zap.Error(err)) continue } - r.Response.Header.Set("Last-Modified", time.Unix(value, 0).Format(time.RFC1123)) } - } - r.SetContentType(writer.contentType) r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename)) } @@ -128,16 +117,14 @@ func (o objectIDs) Slice() []string { for _, oid := range o { res = append(res, oid.String()) } - return res } func (a *app) request(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ RequestCtx: ctx, - - log: log, - obj: a.plant.Object(), + log: log, + obj: a.plant.Object(), } } @@ -177,7 +164,6 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) { log.Error("wrong container id", zap.Error(err)) c.Error("wrong container id", fasthttp.StatusBadRequest) return - // } else if ids, err = a.cli.Object().Search(c, cid, sdk.SearchRootObjects(), sdk.SearchByAttribute(key, val)); err != nil { } // TODO: Take this from a sync-pool. searchOpts := new(neofs.SearchOptions) diff --git a/bearer.go b/tokens/bearer-token.go similarity index 84% rename from bearer.go rename to tokens/bearer-token.go index e14c35c..337c604 100644 --- a/bearer.go +++ b/tokens/bearer-token.go @@ -1,4 +1,4 @@ -package main +package tokens import ( "bytes" @@ -25,20 +25,18 @@ const ( // return // } -func fromHeader(h *fasthttp.RequestHeader) []byte { +func BearerTokenFromHeader(h *fasthttp.RequestHeader) []byte { auth := h.Peek(fasthttp.HeaderAuthorization) if auth == nil || !bytes.HasPrefix(auth, []byte(bearerTokenHdr)) { return nil } - if auth = bytes.TrimPrefix(auth, []byte(bearerTokenHdr+" ")); len(auth) == 0 { return nil } - return auth } -func fromCookie(h *fasthttp.RequestHeader) []byte { +func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte { auth := h.Cookie(bearerTokenHdr) if len(auth) == 0 { return nil @@ -47,7 +45,7 @@ func fromCookie(h *fasthttp.RequestHeader) []byte { return auth } -func storeBearerToken(ctx *fasthttp.RequestCtx) error { +func StoreBearerToken(ctx *fasthttp.RequestCtx) error { tkn, err := fetchBearerToken(ctx) if err != nil { return err @@ -57,6 +55,13 @@ func storeBearerToken(ctx *fasthttp.RequestCtx) error { return nil } +func LoadBearerToken(ctx context.Context) (*token.BearerToken, error) { + if tkn, ok := ctx.Value(bearerTokenKey).(*token.BearerToken); ok && tkn != nil { + return tkn, nil + } + return nil, errors.New("found empty bearer token") +} + func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) { // ignore empty value if ctx == nil { @@ -68,7 +73,7 @@ func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) { buf []byte tkn = new(token.BearerToken) ) - for _, parse := range []fromHandler{fromHeader, fromCookie} { + for _, parse := range []fromHandler{BearerTokenFromHeader, BearerTokenFromCookie} { if buf = parse(&ctx.Request.Header); buf == nil { continue } else if data, err := base64.StdEncoding.DecodeString(string(buf)); err != nil { @@ -86,10 +91,3 @@ func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) { return nil, lastErr } - -func loadBearerToken(ctx context.Context) (*token.BearerToken, error) { - if tkn, ok := ctx.Value(bearerTokenKey).(*token.BearerToken); ok && tkn != nil { - return tkn, nil - } - return nil, errors.New("found empty bearer token") -} diff --git a/bearer_test.go b/tokens/bearer-token_test.go similarity index 92% rename from bearer_test.go rename to tokens/bearer-token_test.go index 4cfa31f..c6ee703 100644 --- a/bearer_test.go +++ b/tokens/bearer-token_test.go @@ -1,4 +1,4 @@ -package main +package tokens import ( "encoding/base64" @@ -36,7 +36,7 @@ func Test_fromCookie(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.expect, fromCookie(makeTestCookie(tt.actual))) + require.Equal(t, tt.expect, BearerTokenFromCookie(makeTestCookie(tt.actual))) }) } } @@ -53,7 +53,7 @@ func Test_fromHeader(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.expect, fromHeader(makeTestHeader(tt.actual))) + require.Equal(t, tt.expect, BearerTokenFromHeader(makeTestHeader(tt.actual))) }) } } @@ -151,10 +151,10 @@ func Test_checkAndPropagateBearerToken(t *testing.T) { ctx := makeTestRequest(t64, "") // Expect to see the token within the context. - require.NoError(t, storeBearerToken(ctx)) + require.NoError(t, StoreBearerToken(ctx)) // Expect to see the same token without errors. - actual, err := loadBearerToken(ctx) + actual, err := LoadBearerToken(ctx) require.NoError(t, err) require.Equal(t, tkn, actual) } diff --git a/filter.go b/uploader/filter.go similarity index 98% rename from filter.go rename to uploader/filter.go index 7d8b24d..47c789a 100644 --- a/filter.go +++ b/uploader/filter.go @@ -1,4 +1,4 @@ -package main +package uploader import ( "bytes" diff --git a/multipart.go b/uploader/multipart.go similarity index 97% rename from multipart.go rename to uploader/multipart.go index f12442f..8abc6d0 100644 --- a/multipart.go +++ b/uploader/multipart.go @@ -1,4 +1,4 @@ -package main +package uploader import ( "io" diff --git a/upload.go b/uploader/upload.go similarity index 77% rename from upload.go rename to uploader/upload.go index 20a65f5..ad03b0d 100644 --- a/upload.go +++ b/uploader/upload.go @@ -1,4 +1,4 @@ -package main +package uploader import ( "context" @@ -12,10 +12,127 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gate/neofs" + "github.com/nspcc-dev/neofs-http-gate/tokens" "github.com/valyala/fasthttp" "go.uber.org/zap" ) +type Uploader struct { + log *zap.Logger + plant neofs.ClientPlant + enableDefaultTimestamp bool +} + +func New(log *zap.Logger, plant neofs.ClientPlant, enableDefaultTimestamp bool) *Uploader { + return &Uploader{log, plant, enableDefaultTimestamp} +} + +func (u *Uploader) Upload(c *fasthttp.RequestCtx) { + var ( + err error + file MultipartFile + addr *object.Address + cid = container.NewID() + scid, _ = c.UserValue("cid").(string) + log = u.log.With(zap.String("cid", scid)) + ) + if err = tokens.StoreBearerToken(c); err != nil { + log.Error("could not fetch bearer token", zap.Error(err)) + c.Error("could not fetch bearer token", fasthttp.StatusBadRequest) + return + } + if err = cid.Parse(scid); err != nil { + log.Error("wrong container id", zap.Error(err)) + c.Error("wrong container id", fasthttp.StatusBadRequest) + return + } + defer func() { + // if temporary reader can be closed - close it + if file == nil { + return + } + err := file.Close() + log.Debug( + "close temporary multipart/form file", + zap.Stringer("address", addr), + zap.String("filename", file.FileName()), + zap.Error(err), + ) + }() + boundary := string(c.Request.Header.MultipartFormBoundary()) + if file, err = fetchMultipartFile(u.log, c.RequestBodyStream(), boundary); err != nil { + log.Error("could not receive multipart/form", zap.Error(err)) + c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) + return + } + filtered := filterHeaders(u.log, &c.Request.Header) + attributes := make([]*object.Attribute, 0, len(filtered)) + // prepares attributes from filtered headers + for key, val := range filtered { + attribute := object.NewAttribute() + attribute.SetKey(key) + attribute.SetValue(val) + attributes = append(attributes, attribute) + } + // sets FileName attribute if it wasn't set from header + if _, ok := filtered[object.AttributeFileName]; !ok { + filename := object.NewAttribute() + filename.SetKey(object.AttributeFileName) + filename.SetValue(file.FileName()) + attributes = append(attributes, filename) + } + // sets Timestamp attribute if it wasn't set from header and enabled by settings + if _, ok := filtered[object.AttributeTimestamp]; !ok && u.enableDefaultTimestamp { + timestamp := object.NewAttribute() + timestamp.SetKey(object.AttributeTimestamp) + timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) + attributes = append(attributes, timestamp) + } + oid, bt := u.fetchOwnerAndBearerToken(c) + // prepares new object and fill it + raw := object.NewRaw() + raw.SetContainerID(cid) + raw.SetOwnerID(oid) + raw.SetAttributes(attributes...) + // tries to put file into NeoFS or throw error + // if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil { + // TODO: Take this from a sync pool. + putOpts := new(neofs.PutOptions) + putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c) + if err != nil { + log.Error("failed to get neofs client's reusable artifacts", zap.Error(err)) + c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError) + return + } + putOpts.BearerToken = bt + putOpts.ContainerID = cid + putOpts.OwnerID = oid + putOpts.PrepareObjectOnsite = false + putOpts.Reader = file + if addr, err = u.plant.Object().Put(c, putOpts); err != nil { + log.Error("could not store file in NeoFS", zap.Error(err)) + c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) + return + } + // tries to return response, otherwise, if something went wrong throw error + if err = newPutResponse(addr).encode(c); err != nil { + log.Error("could not prepare response", zap.Error(err)) + c.Error("could not prepare response", fasthttp.StatusBadRequest) + + return + } + // reports status code and content type + c.Response.SetStatusCode(fasthttp.StatusOK) + c.Response.Header.SetContentType(jsonHeader) +} + +func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) { + if token, err := tokens.LoadBearerToken(ctx); err == nil && token != nil { + return token.Issuer(), token + } + return u.plant.OwnerID(), nil +} + type putResponse struct { OID string `json:"object_id"` CID string `json:"container_id"` @@ -35,109 +152,3 @@ func (pr *putResponse) encode(w io.Writer) error { enc.SetIndent("", "\t") return enc.Encode(pr) } - -func (a *app) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) { - if token, err := loadBearerToken(ctx); err == nil && token != nil { - return token.Issuer(), token - } - return a.plant.OwnerID(), nil -} - -func (a *app) upload(c *fasthttp.RequestCtx) { - var ( - err error - file MultipartFile - addr *object.Address - cid = container.NewID() - sCID, _ = c.UserValue("cid").(string) - log = a.log.With(zap.String("cid", sCID)) - ) - if err = storeBearerToken(c); err != nil { - log.Error("could not fetch bearer token", zap.Error(err)) - c.Error("could not fetch bearer token", fasthttp.StatusBadRequest) - return - } - if err = cid.Parse(sCID); err != nil { - log.Error("wrong container id", zap.Error(err)) - c.Error("wrong container id", fasthttp.StatusBadRequest) - return - } - defer func() { - // if temporary reader can be closed - close it - if file == nil { - return - } - err := file.Close() - log.Debug( - "close temporary multipart/form file", - zap.Stringer("address", addr), - zap.String("filename", file.FileName()), - zap.Error(err), - ) - }() - boundary := string(c.Request.Header.MultipartFormBoundary()) - if file, err = fetchMultipartFile(a.log, c.RequestBodyStream(), boundary); err != nil { - log.Error("could not receive multipart/form", zap.Error(err)) - c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) - return - } - filtered := filterHeaders(a.log, &c.Request.Header) - attributes := make([]*object.Attribute, 0, len(filtered)) - // prepares attributes from filtered headers - for key, val := range filtered { - attribute := object.NewAttribute() - attribute.SetKey(key) - attribute.SetValue(val) - attributes = append(attributes, attribute) - } - // sets FileName attribute if it wasn't set from header - if _, ok := filtered[object.AttributeFileName]; !ok { - filename := object.NewAttribute() - filename.SetKey(object.AttributeFileName) - filename.SetValue(file.FileName()) - attributes = append(attributes, filename) - } - // sets Timestamp attribute if it wasn't set from header and enabled by settings - if _, ok := filtered[object.AttributeTimestamp]; !ok && a.enableDefaultTimestamp { - timestamp := object.NewAttribute() - timestamp.SetKey(object.AttributeTimestamp) - timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) - attributes = append(attributes, timestamp) - } - oid, bt := a.fetchOwnerAndBearerToken(c) - // prepares new object and fill it - raw := object.NewRaw() - raw.SetContainerID(cid) - raw.SetOwnerID(oid) - raw.SetAttributes(attributes...) - // tries to put file into NeoFS or throw error - // if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil { - // TODO: Take this from a sync pool. - putOpts := new(neofs.PutOptions) - putOpts.Client, putOpts.SessionToken, err = a.plant.GetReusableArtifacts(c) - if err != nil { - log.Error("failed to get neofs client's reusable artifacts", zap.Error(err)) - c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError) - return - } - putOpts.BearerToken = bt - putOpts.ContainerID = cid - putOpts.OwnerID = oid - putOpts.PrepareObjectOnsite = false - putOpts.Reader = file - if addr, err = a.plant.Object().Put(c, putOpts); err != nil { - log.Error("could not store file in NeoFS", zap.Error(err)) - c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest) - return - } - // tries to return response, otherwise, if something went wrong throw error - if err = newPutResponse(addr).encode(c); err != nil { - log.Error("could not prepare response", zap.Error(err)) - c.Error("could not prepare response", fasthttp.StatusBadRequest) - - return - } - // reports status code and content type - c.Response.SetStatusCode(fasthttp.StatusOK) - c.Response.Header.SetContentType(jsonHeader) -}