forked from TrueCloudLab/frostfs-http-gw
[#19] Extract uploading logic into a separate package
Signed-off-by: Pavel Korotkov <pavel@nspcc.ru>
This commit is contained in:
parent
4c96885a42
commit
eb92219e14
9 changed files with 153 additions and 188 deletions
4
app.go
4
app.go
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-http-gate/logger"
|
"github.com/nspcc-dev/neofs-http-gate/logger"
|
||||||
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
||||||
|
"github.com/nspcc-dev/neofs-http-gate/uploader"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -151,10 +152,11 @@ func (a *app) Serve(ctx context.Context) {
|
||||||
a.log.Info("shutting down web server", zap.Error(a.web.Shutdown()))
|
a.log.Info("shutting down web server", zap.Error(a.web.Shutdown()))
|
||||||
close(a.webDone)
|
close(a.webDone)
|
||||||
}()
|
}()
|
||||||
|
uploader := uploader.New(a.log, a.plant, a.enableDefaultTimestamp)
|
||||||
// Configure router.
|
// Configure router.
|
||||||
r := router.New()
|
r := router.New()
|
||||||
r.RedirectTrailingSlash = true
|
r.RedirectTrailingSlash = true
|
||||||
r.POST("/upload/{cid}", a.upload)
|
r.POST("/upload/{cid}", uploader.Upload)
|
||||||
a.log.Info("added path /upload/{cid}")
|
a.log.Info("added path /upload/{cid}")
|
||||||
r.GET("/get/{cid}/{oid}", a.byAddress)
|
r.GET("/get/{cid}/{oid}", a.byAddress)
|
||||||
a.log.Info("added path /get/{cid}/{oid}")
|
a.log.Info("added path /get/{cid}/{oid}")
|
||||||
|
|
26
go_dev.mod
26
go_dev.mod
|
@ -1,26 +0,0 @@
|
||||||
module github.com/nspcc-dev/neofs-http-gate
|
|
||||||
|
|
||||||
go 1.16
|
|
||||||
|
|
||||||
require (
|
|
||||||
github.com/fasthttp/router v0.6.1
|
|
||||||
github.com/nspcc-dev/neofs-api v0.0.0-00000000000000-000000000000
|
|
||||||
github.com/nspcc-dev/neofs-crypto v0.2.3
|
|
||||||
github.com/prometheus/client_golang v1.4.1 // v1.2.1 => v1.4.1
|
|
||||||
github.com/prometheus/common v0.9.1
|
|
||||||
github.com/spf13/pflag v1.0.5
|
|
||||||
github.com/spf13/viper v1.6.2 // v1.6.1 => v1.6.2
|
|
||||||
github.com/valyala/fasthttp v1.9.0
|
|
||||||
go.uber.org/atomic v1.6.0
|
|
||||||
go.uber.org/zap v1.14.0
|
|
||||||
golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876 // indirect
|
|
||||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
|
|
||||||
golang.org/x/text v0.3.2 // indirect
|
|
||||||
google.golang.org/grpc v1.27.1
|
|
||||||
)
|
|
||||||
|
|
||||||
// For debug reasons
|
|
||||||
replace (
|
|
||||||
github.com/nspcc-dev/neofs-api => ../neofs-api
|
|
||||||
google.golang.org/grpc => ../grpc-go
|
|
||||||
)
|
|
|
@ -21,14 +21,17 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
nodeConnectionTimeout = 10 * time.Second
|
nodeConnectionTimeout = 10 * time.Second
|
||||||
maxObjectSize = uint64(1 << (20 + 6)) // 64MB
|
maxObjectSize = uint64(1 << 26) // 64MiB
|
||||||
)
|
)
|
||||||
|
|
||||||
type PutOptions struct {
|
type BaseOptions struct {
|
||||||
Client client.Client
|
Client client.Client
|
||||||
SessionToken *token.SessionToken
|
SessionToken *token.SessionToken
|
||||||
BearerToken *token.BearerToken
|
BearerToken *token.BearerToken
|
||||||
// ...
|
}
|
||||||
|
|
||||||
|
type PutOptions struct {
|
||||||
|
BaseOptions
|
||||||
ContainerID *container.ID
|
ContainerID *container.ID
|
||||||
OwnerID *owner.ID
|
OwnerID *owner.ID
|
||||||
PrepareObjectOnsite bool
|
PrepareObjectOnsite bool
|
||||||
|
@ -36,19 +39,13 @@ type PutOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetOptions struct {
|
type GetOptions struct {
|
||||||
Client client.Client
|
BaseOptions
|
||||||
SessionToken *token.SessionToken
|
|
||||||
BearerToken *token.BearerToken
|
|
||||||
// ...
|
|
||||||
ObjectAddress *object.Address
|
ObjectAddress *object.Address
|
||||||
Writer io.Writer
|
Writer io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
type SearchOptions struct {
|
type SearchOptions struct {
|
||||||
Client client.Client
|
BaseOptions
|
||||||
SessionToken *token.SessionToken
|
|
||||||
BearerToken *token.BearerToken
|
|
||||||
// ...
|
|
||||||
ContainerID *container.ID
|
ContainerID *container.ID
|
||||||
Attribute struct {
|
Attribute struct {
|
||||||
Key string
|
Key string
|
||||||
|
@ -57,10 +54,7 @@ type SearchOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type DeleteOptions struct {
|
type DeleteOptions struct {
|
||||||
Client client.Client
|
BaseOptions
|
||||||
SessionToken *token.SessionToken
|
|
||||||
BearerToken *token.BearerToken
|
|
||||||
// ...
|
|
||||||
ObjectAddress *object.Address
|
ObjectAddress *object.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
18
receive.go
18
receive.go
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
||||||
|
"github.com/nspcc-dev/neofs-http-gate/tokens"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -23,13 +24,11 @@ type (
|
||||||
detector struct {
|
detector struct {
|
||||||
io.Writer
|
io.Writer
|
||||||
sync.Once
|
sync.Once
|
||||||
|
|
||||||
contentType string
|
contentType string
|
||||||
}
|
}
|
||||||
|
|
||||||
request struct {
|
request struct {
|
||||||
*fasthttp.RequestCtx
|
*fasthttp.RequestCtx
|
||||||
|
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
obj neofs.ObjectClient
|
obj neofs.ObjectClient
|
||||||
}
|
}
|
||||||
|
@ -45,7 +44,6 @@ func (d *detector) Write(data []byte) (int, error) {
|
||||||
d.Once.Do(func() {
|
d.Once.Do(func() {
|
||||||
d.contentType = http.DetectContentType(data)
|
d.contentType = http.DetectContentType(data)
|
||||||
})
|
})
|
||||||
|
|
||||||
return d.Writer.Write(data)
|
return d.Writer.Write(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,13 +54,12 @@ func (r *request) receiveFile(options *neofs.GetOptions) {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
filename string
|
filename string
|
||||||
)
|
)
|
||||||
if err = storeBearerToken(r.RequestCtx); err != nil {
|
if err = tokens.StoreBearerToken(r.RequestCtx); err != nil {
|
||||||
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
r.log.Error("could not fetch and store bearer token", zap.Error(err))
|
||||||
r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
r.Error("could not fetch and store bearer token", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writer := newDetector(r.Response.BodyWriter())
|
writer := newDetector(r.Response.BodyWriter())
|
||||||
// obj, err := r.obj.Get(r, address, sdk.WithGetWriter(writer))
|
|
||||||
options.Writer = writer
|
options.Writer = writer
|
||||||
obj, err := r.obj.Get(r.RequestCtx, options)
|
obj, err := r.obj.Get(r.RequestCtx, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -84,22 +81,17 @@ func (r *request) receiveFile(options *neofs.GetOptions) {
|
||||||
r.Error(msg, code)
|
r.Error(msg, code)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Request.URI().QueryArgs().GetBool("download") {
|
if r.Request.URI().QueryArgs().GetBool("download") {
|
||||||
dis = "attachment"
|
dis = "attachment"
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10))
|
r.Response.Header.Set("Content-Length", strconv.FormatUint(obj.PayloadSize(), 10))
|
||||||
r.Response.Header.Set("x-object-id", obj.ID().String())
|
r.Response.Header.Set("x-object-id", obj.ID().String())
|
||||||
r.Response.Header.Set("x-owner-id", obj.OwnerID().String())
|
r.Response.Header.Set("x-owner-id", obj.OwnerID().String())
|
||||||
r.Response.Header.Set("x-container-id", obj.ContainerID().String())
|
r.Response.Header.Set("x-container-id", obj.ContainerID().String())
|
||||||
|
|
||||||
for _, attr := range obj.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
val := attr.Value()
|
val := attr.Value()
|
||||||
|
|
||||||
r.Response.Header.Set("x-"+key, val)
|
r.Response.Header.Set("x-"+key, val)
|
||||||
|
|
||||||
switch key {
|
switch key {
|
||||||
case object.AttributeFileName:
|
case object.AttributeFileName:
|
||||||
filename = val
|
filename = val
|
||||||
|
@ -112,13 +104,10 @@ func (r *request) receiveFile(options *neofs.GetOptions) {
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Response.Header.Set("Last-Modified",
|
r.Response.Header.Set("Last-Modified",
|
||||||
time.Unix(value, 0).Format(time.RFC1123))
|
time.Unix(value, 0).Format(time.RFC1123))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.SetContentType(writer.contentType)
|
r.SetContentType(writer.contentType)
|
||||||
r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename))
|
r.Response.Header.Set("Content-Disposition", dis+"; filename="+path.Base(filename))
|
||||||
}
|
}
|
||||||
|
@ -128,14 +117,12 @@ func (o objectIDs) Slice() []string {
|
||||||
for _, oid := range o {
|
for _, oid := range o {
|
||||||
res = append(res, oid.String())
|
res = append(res, oid.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) request(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
func (a *app) request(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||||
return &request{
|
return &request{
|
||||||
RequestCtx: ctx,
|
RequestCtx: ctx,
|
||||||
|
|
||||||
log: log,
|
log: log,
|
||||||
obj: a.plant.Object(),
|
obj: a.plant.Object(),
|
||||||
}
|
}
|
||||||
|
@ -177,7 +164,6 @@ func (a *app) byAttribute(c *fasthttp.RequestCtx) {
|
||||||
log.Error("wrong container id", zap.Error(err))
|
log.Error("wrong container id", zap.Error(err))
|
||||||
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
// } else if ids, err = a.cli.Object().Search(c, cid, sdk.SearchRootObjects(), sdk.SearchByAttribute(key, val)); err != nil {
|
|
||||||
}
|
}
|
||||||
// TODO: Take this from a sync-pool.
|
// TODO: Take this from a sync-pool.
|
||||||
searchOpts := new(neofs.SearchOptions)
|
searchOpts := new(neofs.SearchOptions)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package tokens
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -25,20 +25,18 @@ const (
|
||||||
// return
|
// return
|
||||||
// }
|
// }
|
||||||
|
|
||||||
func fromHeader(h *fasthttp.RequestHeader) []byte {
|
func BearerTokenFromHeader(h *fasthttp.RequestHeader) []byte {
|
||||||
auth := h.Peek(fasthttp.HeaderAuthorization)
|
auth := h.Peek(fasthttp.HeaderAuthorization)
|
||||||
if auth == nil || !bytes.HasPrefix(auth, []byte(bearerTokenHdr)) {
|
if auth == nil || !bytes.HasPrefix(auth, []byte(bearerTokenHdr)) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if auth = bytes.TrimPrefix(auth, []byte(bearerTokenHdr+" ")); len(auth) == 0 {
|
if auth = bytes.TrimPrefix(auth, []byte(bearerTokenHdr+" ")); len(auth) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return auth
|
return auth
|
||||||
}
|
}
|
||||||
|
|
||||||
func fromCookie(h *fasthttp.RequestHeader) []byte {
|
func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte {
|
||||||
auth := h.Cookie(bearerTokenHdr)
|
auth := h.Cookie(bearerTokenHdr)
|
||||||
if len(auth) == 0 {
|
if len(auth) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -47,7 +45,7 @@ func fromCookie(h *fasthttp.RequestHeader) []byte {
|
||||||
return auth
|
return auth
|
||||||
}
|
}
|
||||||
|
|
||||||
func storeBearerToken(ctx *fasthttp.RequestCtx) error {
|
func StoreBearerToken(ctx *fasthttp.RequestCtx) error {
|
||||||
tkn, err := fetchBearerToken(ctx)
|
tkn, err := fetchBearerToken(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -57,6 +55,13 @@ func storeBearerToken(ctx *fasthttp.RequestCtx) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LoadBearerToken(ctx context.Context) (*token.BearerToken, error) {
|
||||||
|
if tkn, ok := ctx.Value(bearerTokenKey).(*token.BearerToken); ok && tkn != nil {
|
||||||
|
return tkn, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("found empty bearer token")
|
||||||
|
}
|
||||||
|
|
||||||
func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) {
|
func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) {
|
||||||
// ignore empty value
|
// ignore empty value
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
|
@ -68,7 +73,7 @@ func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) {
|
||||||
buf []byte
|
buf []byte
|
||||||
tkn = new(token.BearerToken)
|
tkn = new(token.BearerToken)
|
||||||
)
|
)
|
||||||
for _, parse := range []fromHandler{fromHeader, fromCookie} {
|
for _, parse := range []fromHandler{BearerTokenFromHeader, BearerTokenFromCookie} {
|
||||||
if buf = parse(&ctx.Request.Header); buf == nil {
|
if buf = parse(&ctx.Request.Header); buf == nil {
|
||||||
continue
|
continue
|
||||||
} else if data, err := base64.StdEncoding.DecodeString(string(buf)); err != nil {
|
} else if data, err := base64.StdEncoding.DecodeString(string(buf)); err != nil {
|
||||||
|
@ -86,10 +91,3 @@ func fetchBearerToken(ctx *fasthttp.RequestCtx) (*token.BearerToken, error) {
|
||||||
|
|
||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadBearerToken(ctx context.Context) (*token.BearerToken, error) {
|
|
||||||
if tkn, ok := ctx.Value(bearerTokenKey).(*token.BearerToken); ok && tkn != nil {
|
|
||||||
return tkn, nil
|
|
||||||
}
|
|
||||||
return nil, errors.New("found empty bearer token")
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package tokens
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
@ -36,7 +36,7 @@ func Test_fromCookie(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range cases {
|
for _, tt := range cases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
require.Equal(t, tt.expect, fromCookie(makeTestCookie(tt.actual)))
|
require.Equal(t, tt.expect, BearerTokenFromCookie(makeTestCookie(tt.actual)))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ func Test_fromHeader(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range cases {
|
for _, tt := range cases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
require.Equal(t, tt.expect, fromHeader(makeTestHeader(tt.actual)))
|
require.Equal(t, tt.expect, BearerTokenFromHeader(makeTestHeader(tt.actual)))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -151,10 +151,10 @@ func Test_checkAndPropagateBearerToken(t *testing.T) {
|
||||||
ctx := makeTestRequest(t64, "")
|
ctx := makeTestRequest(t64, "")
|
||||||
|
|
||||||
// Expect to see the token within the context.
|
// Expect to see the token within the context.
|
||||||
require.NoError(t, storeBearerToken(ctx))
|
require.NoError(t, StoreBearerToken(ctx))
|
||||||
|
|
||||||
// Expect to see the same token without errors.
|
// Expect to see the same token without errors.
|
||||||
actual, err := loadBearerToken(ctx)
|
actual, err := LoadBearerToken(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tkn, actual)
|
require.Equal(t, tkn, actual)
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package uploader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package uploader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package uploader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,10 +12,127 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
||||||
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
"github.com/nspcc-dev/neofs-http-gate/neofs"
|
||||||
|
"github.com/nspcc-dev/neofs-http-gate/tokens"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Uploader struct {
|
||||||
|
log *zap.Logger
|
||||||
|
plant neofs.ClientPlant
|
||||||
|
enableDefaultTimestamp bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(log *zap.Logger, plant neofs.ClientPlant, enableDefaultTimestamp bool) *Uploader {
|
||||||
|
return &Uploader{log, plant, enableDefaultTimestamp}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
file MultipartFile
|
||||||
|
addr *object.Address
|
||||||
|
cid = container.NewID()
|
||||||
|
scid, _ = c.UserValue("cid").(string)
|
||||||
|
log = u.log.With(zap.String("cid", scid))
|
||||||
|
)
|
||||||
|
if err = tokens.StoreBearerToken(c); err != nil {
|
||||||
|
log.Error("could not fetch bearer token", zap.Error(err))
|
||||||
|
c.Error("could not fetch bearer token", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = cid.Parse(scid); err != nil {
|
||||||
|
log.Error("wrong container id", zap.Error(err))
|
||||||
|
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
// if temporary reader can be closed - close it
|
||||||
|
if file == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := file.Close()
|
||||||
|
log.Debug(
|
||||||
|
"close temporary multipart/form file",
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("filename", file.FileName()),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
}()
|
||||||
|
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||||
|
if file, err = fetchMultipartFile(u.log, c.RequestBodyStream(), boundary); err != nil {
|
||||||
|
log.Error("could not receive multipart/form", zap.Error(err))
|
||||||
|
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
filtered := filterHeaders(u.log, &c.Request.Header)
|
||||||
|
attributes := make([]*object.Attribute, 0, len(filtered))
|
||||||
|
// prepares attributes from filtered headers
|
||||||
|
for key, val := range filtered {
|
||||||
|
attribute := object.NewAttribute()
|
||||||
|
attribute.SetKey(key)
|
||||||
|
attribute.SetValue(val)
|
||||||
|
attributes = append(attributes, attribute)
|
||||||
|
}
|
||||||
|
// sets FileName attribute if it wasn't set from header
|
||||||
|
if _, ok := filtered[object.AttributeFileName]; !ok {
|
||||||
|
filename := object.NewAttribute()
|
||||||
|
filename.SetKey(object.AttributeFileName)
|
||||||
|
filename.SetValue(file.FileName())
|
||||||
|
attributes = append(attributes, filename)
|
||||||
|
}
|
||||||
|
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
||||||
|
if _, ok := filtered[object.AttributeTimestamp]; !ok && u.enableDefaultTimestamp {
|
||||||
|
timestamp := object.NewAttribute()
|
||||||
|
timestamp.SetKey(object.AttributeTimestamp)
|
||||||
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
attributes = append(attributes, timestamp)
|
||||||
|
}
|
||||||
|
oid, bt := u.fetchOwnerAndBearerToken(c)
|
||||||
|
// prepares new object and fill it
|
||||||
|
raw := object.NewRaw()
|
||||||
|
raw.SetContainerID(cid)
|
||||||
|
raw.SetOwnerID(oid)
|
||||||
|
raw.SetAttributes(attributes...)
|
||||||
|
// tries to put file into NeoFS or throw error
|
||||||
|
// if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil {
|
||||||
|
// TODO: Take this from a sync pool.
|
||||||
|
putOpts := new(neofs.PutOptions)
|
||||||
|
putOpts.Client, putOpts.SessionToken, err = u.plant.GetReusableArtifacts(c)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("failed to get neofs client's reusable artifacts", zap.Error(err))
|
||||||
|
c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
putOpts.BearerToken = bt
|
||||||
|
putOpts.ContainerID = cid
|
||||||
|
putOpts.OwnerID = oid
|
||||||
|
putOpts.PrepareObjectOnsite = false
|
||||||
|
putOpts.Reader = file
|
||||||
|
if addr, err = u.plant.Object().Put(c, putOpts); err != nil {
|
||||||
|
log.Error("could not store file in NeoFS", zap.Error(err))
|
||||||
|
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// tries to return response, otherwise, if something went wrong throw error
|
||||||
|
if err = newPutResponse(addr).encode(c); err != nil {
|
||||||
|
log.Error("could not prepare response", zap.Error(err))
|
||||||
|
c.Error("could not prepare response", fasthttp.StatusBadRequest)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// reports status code and content type
|
||||||
|
c.Response.SetStatusCode(fasthttp.StatusOK)
|
||||||
|
c.Response.Header.SetContentType(jsonHeader)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
||||||
|
if token, err := tokens.LoadBearerToken(ctx); err == nil && token != nil {
|
||||||
|
return token.Issuer(), token
|
||||||
|
}
|
||||||
|
return u.plant.OwnerID(), nil
|
||||||
|
}
|
||||||
|
|
||||||
type putResponse struct {
|
type putResponse struct {
|
||||||
OID string `json:"object_id"`
|
OID string `json:"object_id"`
|
||||||
CID string `json:"container_id"`
|
CID string `json:"container_id"`
|
||||||
|
@ -35,109 +152,3 @@ func (pr *putResponse) encode(w io.Writer) error {
|
||||||
enc.SetIndent("", "\t")
|
enc.SetIndent("", "\t")
|
||||||
return enc.Encode(pr)
|
return enc.Encode(pr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
|
||||||
if token, err := loadBearerToken(ctx); err == nil && token != nil {
|
|
||||||
return token.Issuer(), token
|
|
||||||
}
|
|
||||||
return a.plant.OwnerID(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *app) upload(c *fasthttp.RequestCtx) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
file MultipartFile
|
|
||||||
addr *object.Address
|
|
||||||
cid = container.NewID()
|
|
||||||
sCID, _ = c.UserValue("cid").(string)
|
|
||||||
log = a.log.With(zap.String("cid", sCID))
|
|
||||||
)
|
|
||||||
if err = storeBearerToken(c); err != nil {
|
|
||||||
log.Error("could not fetch bearer token", zap.Error(err))
|
|
||||||
c.Error("could not fetch bearer token", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = cid.Parse(sCID); err != nil {
|
|
||||||
log.Error("wrong container id", zap.Error(err))
|
|
||||||
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
// if temporary reader can be closed - close it
|
|
||||||
if file == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err := file.Close()
|
|
||||||
log.Debug(
|
|
||||||
"close temporary multipart/form file",
|
|
||||||
zap.Stringer("address", addr),
|
|
||||||
zap.String("filename", file.FileName()),
|
|
||||||
zap.Error(err),
|
|
||||||
)
|
|
||||||
}()
|
|
||||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
|
||||||
if file, err = fetchMultipartFile(a.log, c.RequestBodyStream(), boundary); err != nil {
|
|
||||||
log.Error("could not receive multipart/form", zap.Error(err))
|
|
||||||
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
filtered := filterHeaders(a.log, &c.Request.Header)
|
|
||||||
attributes := make([]*object.Attribute, 0, len(filtered))
|
|
||||||
// prepares attributes from filtered headers
|
|
||||||
for key, val := range filtered {
|
|
||||||
attribute := object.NewAttribute()
|
|
||||||
attribute.SetKey(key)
|
|
||||||
attribute.SetValue(val)
|
|
||||||
attributes = append(attributes, attribute)
|
|
||||||
}
|
|
||||||
// sets FileName attribute if it wasn't set from header
|
|
||||||
if _, ok := filtered[object.AttributeFileName]; !ok {
|
|
||||||
filename := object.NewAttribute()
|
|
||||||
filename.SetKey(object.AttributeFileName)
|
|
||||||
filename.SetValue(file.FileName())
|
|
||||||
attributes = append(attributes, filename)
|
|
||||||
}
|
|
||||||
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
|
||||||
if _, ok := filtered[object.AttributeTimestamp]; !ok && a.enableDefaultTimestamp {
|
|
||||||
timestamp := object.NewAttribute()
|
|
||||||
timestamp.SetKey(object.AttributeTimestamp)
|
|
||||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
|
||||||
attributes = append(attributes, timestamp)
|
|
||||||
}
|
|
||||||
oid, bt := a.fetchOwnerAndBearerToken(c)
|
|
||||||
// prepares new object and fill it
|
|
||||||
raw := object.NewRaw()
|
|
||||||
raw.SetContainerID(cid)
|
|
||||||
raw.SetOwnerID(oid)
|
|
||||||
raw.SetAttributes(attributes...)
|
|
||||||
// tries to put file into NeoFS or throw error
|
|
||||||
// if addr, err = a.plant.Object().Put(c, raw.Object(), sdk.WithPutReader(file)); err != nil {
|
|
||||||
// TODO: Take this from a sync pool.
|
|
||||||
putOpts := new(neofs.PutOptions)
|
|
||||||
putOpts.Client, putOpts.SessionToken, err = a.plant.GetReusableArtifacts(c)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("failed to get neofs client's reusable artifacts", zap.Error(err))
|
|
||||||
c.Error("failed to get neofs client's reusable artifacts", fasthttp.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
putOpts.BearerToken = bt
|
|
||||||
putOpts.ContainerID = cid
|
|
||||||
putOpts.OwnerID = oid
|
|
||||||
putOpts.PrepareObjectOnsite = false
|
|
||||||
putOpts.Reader = file
|
|
||||||
if addr, err = a.plant.Object().Put(c, putOpts); err != nil {
|
|
||||||
log.Error("could not store file in NeoFS", zap.Error(err))
|
|
||||||
c.Error("could not store file in NeoFS", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// tries to return response, otherwise, if something went wrong throw error
|
|
||||||
if err = newPutResponse(addr).encode(c); err != nil {
|
|
||||||
log.Error("could not prepare response", zap.Error(err))
|
|
||||||
c.Error("could not prepare response", fasthttp.StatusBadRequest)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// reports status code and content type
|
|
||||||
c.Response.SetStatusCode(fasthttp.StatusOK)
|
|
||||||
c.Response.Header.SetContentType(jsonHeader)
|
|
||||||
}
|
|
Loading…
Reference in a new issue