1e3df95eed
It is meaningless to use RequestCtx as a context.Context for NeoFS operation, because context won't be closed until application shutdown. Moreover, it also triggers data race detection, because server's done channel, which is accessible for reading from RequestCtx, is set to `nil`. Using application context doesn't change gateway behavior, but it suppresses data race trigger at shutdown. It also allows possibility to set configurable timeouts for NeoFS networking if we will ever need them. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
232 lines
7.1 KiB
Go
232 lines
7.1 KiB
Go
package uploader
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neofs-http-gw/response"
|
|
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
|
"github.com/nspcc-dev/neofs-http-gw/utils"
|
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/netmap"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
|
"github.com/nspcc-dev/neofs-sdk-go/object/address"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
|
"github.com/nspcc-dev/neofs-sdk-go/pool"
|
|
"github.com/nspcc-dev/neofs-sdk-go/token"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
jsonHeader = "application/json; charset=UTF-8"
|
|
drainBufSize = 4096
|
|
)
|
|
|
|
// Uploader is an upload request handler.
|
|
type Uploader struct {
|
|
appCtx context.Context
|
|
log *zap.Logger
|
|
pool *pool.Pool
|
|
enableDefaultTimestamp bool
|
|
}
|
|
|
|
type epochDurations struct {
|
|
currentEpoch uint64
|
|
msPerBlock int64
|
|
blockPerEpoch uint64
|
|
}
|
|
|
|
// New creates a new Uploader using specified logger, connection pool and
|
|
// other options.
|
|
func New(ctx context.Context, log *zap.Logger, conns *pool.Pool, enableDefaultTimestamp bool) *Uploader {
|
|
return &Uploader{ctx, log, conns, enableDefaultTimestamp}
|
|
}
|
|
|
|
// Upload handles multipart upload request.
|
|
func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|
var (
|
|
err error
|
|
file MultipartFile
|
|
idObj *oid.ID
|
|
addr = address.NewAddress()
|
|
idCnr = cid.New()
|
|
scid, _ = c.UserValue("cid").(string)
|
|
log = u.log.With(zap.String("cid", scid))
|
|
bodyStream = c.RequestBodyStream()
|
|
drainBuf = make([]byte, drainBufSize)
|
|
)
|
|
if err = tokens.StoreBearerToken(c); err != nil {
|
|
log.Error("could not fetch bearer token", zap.Error(err))
|
|
response.Error(c, "could not fetch bearer token", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
if err = idCnr.Parse(scid); err != nil {
|
|
log.Error("wrong container id", zap.Error(err))
|
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
defer func() {
|
|
// If the temporary reader can be closed - let's close it.
|
|
if file == nil {
|
|
return
|
|
}
|
|
err := file.Close()
|
|
log.Debug(
|
|
"close temporary multipart/form file",
|
|
zap.Stringer("address", addr),
|
|
zap.String("filename", file.FileName()),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
boundary := string(c.Request.Header.MultipartFormBoundary())
|
|
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
|
log.Error("could not receive multipart/form", zap.Error(err))
|
|
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
filtered := filterHeaders(u.log, &c.Request.Header)
|
|
if needParseExpiration(filtered) {
|
|
epochDuration, err := getEpochDurations(c, u.pool)
|
|
if err != nil {
|
|
log.Error("could not get epoch durations from network info", zap.Error(err))
|
|
response.Error(c, "could not get epoch durations from network info: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
if err = prepareExpirationHeader(filtered, epochDuration); err != nil {
|
|
log.Error("could not parse expiration header", zap.Error(err))
|
|
response.Error(c, "could not parse expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
attributes := make([]object.Attribute, 0, len(filtered))
|
|
// prepares attributes from filtered headers
|
|
for key, val := range filtered {
|
|
attribute := object.NewAttribute()
|
|
attribute.SetKey(key)
|
|
attribute.SetValue(val)
|
|
attributes = append(attributes, *attribute)
|
|
}
|
|
// sets FileName attribute if it wasn't set from header
|
|
if _, ok := filtered[object.AttributeFileName]; !ok {
|
|
filename := object.NewAttribute()
|
|
filename.SetKey(object.AttributeFileName)
|
|
filename.SetValue(file.FileName())
|
|
attributes = append(attributes, *filename)
|
|
}
|
|
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
|
if _, ok := filtered[object.AttributeTimestamp]; !ok && u.enableDefaultTimestamp {
|
|
timestamp := object.NewAttribute()
|
|
timestamp.SetKey(object.AttributeTimestamp)
|
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
|
attributes = append(attributes, *timestamp)
|
|
}
|
|
id, bt := u.fetchOwnerAndBearerToken(c)
|
|
|
|
obj := object.New()
|
|
obj.SetContainerID(idCnr)
|
|
obj.SetOwnerID(id)
|
|
obj.SetAttributes(attributes...)
|
|
|
|
var prm pool.PrmObjectPut
|
|
prm.SetHeader(*obj)
|
|
prm.SetPayload(file)
|
|
prm.UseBearer(bt)
|
|
|
|
if idObj, err = u.pool.PutObject(u.appCtx, prm); err != nil {
|
|
log.Error("could not store file in neofs", zap.Error(err))
|
|
response.Error(c, "could not store file in neofs: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
addr.SetObjectID(idObj)
|
|
addr.SetContainerID(idCnr)
|
|
|
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
|
if err = newPutResponse(addr).encode(c); err != nil {
|
|
log.Error("could not encode response", zap.Error(err))
|
|
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
// Multipart is multipart and thus can contain more than one part which
|
|
// we ignore at the moment. Also, when dealing with chunked encoding
|
|
// the last zero-length chunk might be left unread (because multipart
|
|
// reader only cares about its boundary and doesn't look further) and
|
|
// it will be (erroneously) interpreted as the start of the next
|
|
// pipelined header. Thus we need to drain the body buffer.
|
|
for {
|
|
_, err = bodyStream.Read(drainBuf)
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
break
|
|
}
|
|
}
|
|
// Report status code and content type.
|
|
c.Response.SetStatusCode(fasthttp.StatusOK)
|
|
c.Response.Header.SetContentType(jsonHeader)
|
|
}
|
|
|
|
func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
|
return tkn.Issuer(), tkn
|
|
}
|
|
return u.pool.OwnerID(), nil
|
|
}
|
|
|
|
type putResponse struct {
|
|
ObjectID string `json:"object_id"`
|
|
ContainerID string `json:"container_id"`
|
|
}
|
|
|
|
func newPutResponse(addr *address.Address) *putResponse {
|
|
return &putResponse{
|
|
ObjectID: addr.ObjectID().String(),
|
|
ContainerID: addr.ContainerID().String(),
|
|
}
|
|
}
|
|
|
|
func (pr *putResponse) encode(w io.Writer) error {
|
|
enc := json.NewEncoder(w)
|
|
enc.SetIndent("", "\t")
|
|
return enc.Encode(pr)
|
|
}
|
|
|
|
func getEpochDurations(ctx context.Context, p *pool.Pool) (*epochDurations, error) {
|
|
networkInfo, err := p.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := &epochDurations{
|
|
currentEpoch: networkInfo.CurrentEpoch(),
|
|
msPerBlock: networkInfo.MsPerBlock(),
|
|
}
|
|
|
|
networkInfo.NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool {
|
|
if string(parameter.Key()) == "EpochDuration" {
|
|
data := make([]byte, 8)
|
|
copy(data, parameter.Value())
|
|
res.blockPerEpoch = binary.LittleEndian.Uint64(data)
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
if res.blockPerEpoch == 0 {
|
|
return nil, fmt.Errorf("not found param: EpochDuration")
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func needParseExpiration(headers map[string]string) bool {
|
|
_, ok1 := headers[utils.ExpirationDurationAttr]
|
|
_, ok2 := headers[utils.ExpirationRFC3339Attr]
|
|
_, ok3 := headers[utils.ExpirationTimestampAttr]
|
|
return ok1 || ok2 || ok3
|
|
}
|