82b2126bfd
Removed connections, logger, neofs because they were moved to sdk repo. Made changes in downloader, uploader, main.go and app.go via refactoring of neofs. Replaced dependencies to removed packages by sdk packages. Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
179 lines
5.6 KiB
Go
179 lines
5.6 KiB
Go
package uploader
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/token"
|
|
"github.com/nspcc-dev/neofs-http-gw/tokens"
|
|
"github.com/nspcc-dev/neofs-sdk-go/pkg/neofs"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
jsonHeader = "application/json; charset=UTF-8"
|
|
drainBufSize = 4096
|
|
)
|
|
|
|
// Uploader is an upload request handler.
|
|
type Uploader struct {
|
|
log *zap.Logger
|
|
plant neofs.ClientPlant
|
|
enableDefaultTimestamp bool
|
|
}
|
|
|
|
// New creates a new Uploader using specified logger, connection pool and
|
|
// other options.
|
|
func New(log *zap.Logger, plant neofs.ClientPlant, enableDefaultTimestamp bool) *Uploader {
|
|
return &Uploader{log, plant, enableDefaultTimestamp}
|
|
}
|
|
|
|
// Upload handles multipart upload request.
|
|
func (u *Uploader) Upload(c *fasthttp.RequestCtx) {
|
|
var (
|
|
err error
|
|
file MultipartFile
|
|
obj *object.ID
|
|
conn client.Client
|
|
tkn *token.SessionToken
|
|
addr = object.NewAddress()
|
|
cid = container.NewID()
|
|
scid, _ = c.UserValue("cid").(string)
|
|
log = u.log.With(zap.String("cid", scid))
|
|
bodyStream = c.RequestBodyStream()
|
|
drainBuf = make([]byte, drainBufSize)
|
|
)
|
|
if err = tokens.StoreBearerToken(c); err != nil {
|
|
log.Error("could not fetch bearer token", zap.Error(err))
|
|
c.Error("could not fetch bearer token", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
if err = cid.Parse(scid); err != nil {
|
|
log.Error("wrong container id", zap.Error(err))
|
|
c.Error("wrong container id", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
defer func() {
|
|
// If the temporary reader can be closed - let's close it.
|
|
if file == nil {
|
|
return
|
|
}
|
|
err := file.Close()
|
|
log.Debug(
|
|
"close temporary multipart/form file",
|
|
zap.Stringer("address", addr),
|
|
zap.String("filename", file.FileName()),
|
|
zap.Error(err),
|
|
)
|
|
}()
|
|
boundary := string(c.Request.Header.MultipartFormBoundary())
|
|
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
|
log.Error("could not receive multipart/form", zap.Error(err))
|
|
c.Error("could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
filtered := filterHeaders(u.log, &c.Request.Header)
|
|
attributes := make([]*object.Attribute, 0, len(filtered))
|
|
// prepares attributes from filtered headers
|
|
for key, val := range filtered {
|
|
attribute := object.NewAttribute()
|
|
attribute.SetKey(key)
|
|
attribute.SetValue(val)
|
|
attributes = append(attributes, attribute)
|
|
}
|
|
// sets FileName attribute if it wasn't set from header
|
|
if _, ok := filtered[object.AttributeFileName]; !ok {
|
|
filename := object.NewAttribute()
|
|
filename.SetKey(object.AttributeFileName)
|
|
filename.SetValue(file.FileName())
|
|
attributes = append(attributes, filename)
|
|
}
|
|
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
|
if _, ok := filtered[object.AttributeTimestamp]; !ok && u.enableDefaultTimestamp {
|
|
timestamp := object.NewAttribute()
|
|
timestamp.SetKey(object.AttributeTimestamp)
|
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
|
attributes = append(attributes, timestamp)
|
|
}
|
|
oid, bt := u.fetchOwnerAndBearerToken(c)
|
|
|
|
// Try to put file into NeoFS or throw an error.
|
|
conn, tkn, err = u.plant.ConnectionArtifacts()
|
|
if err != nil {
|
|
log.Error("failed to get neofs connection artifacts", zap.Error(err))
|
|
c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
rawObject := object.NewRaw()
|
|
rawObject.SetContainerID(cid)
|
|
rawObject.SetOwnerID(oid)
|
|
rawObject.SetAttributes(attributes...)
|
|
|
|
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(file)
|
|
|
|
if obj, err = conn.PutObject(c, ops, client.WithSession(tkn), client.WithBearer(bt)); err != nil {
|
|
log.Error("could not store file in neofs", zap.Error(err))
|
|
c.Error("could not store file in neofs", fasthttp.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
addr.SetObjectID(obj)
|
|
addr.SetContainerID(cid)
|
|
|
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
|
if err = newPutResponse(addr).encode(c); err != nil {
|
|
log.Error("could not prepare response", zap.Error(err))
|
|
c.Error("could not prepare response", fasthttp.StatusBadRequest)
|
|
|
|
return
|
|
}
|
|
// Multipart is multipart and thus can contain more than one part which
|
|
// we ignore at the moment. Also, when dealing with chunked encoding
|
|
// the last zero-length chunk might be left unread (because multipart
|
|
// reader only cares about its boundary and doesn't look further) and
|
|
// it will be (erroneously) interpreted as the start of the next
|
|
// pipelined header. Thus we need to drain the body buffer.
|
|
for {
|
|
_, err = bodyStream.Read(drainBuf)
|
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
|
break
|
|
}
|
|
}
|
|
// Report status code and content type.
|
|
c.Response.SetStatusCode(fasthttp.StatusOK)
|
|
c.Response.Header.SetContentType(jsonHeader)
|
|
}
|
|
|
|
func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) {
|
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
|
return tkn.Issuer(), tkn
|
|
}
|
|
return u.plant.OwnerID(), nil
|
|
}
|
|
|
|
type putResponse struct {
|
|
ObjectID string `json:"object_id"`
|
|
ContainerID string `json:"container_id"`
|
|
}
|
|
|
|
func newPutResponse(addr *object.Address) *putResponse {
|
|
return &putResponse{
|
|
ObjectID: addr.ObjectID().String(),
|
|
ContainerID: addr.ContainerID().String(),
|
|
}
|
|
}
|
|
|
|
func (pr *putResponse) encode(w io.Writer) error {
|
|
enc := json.NewEncoder(w)
|
|
enc.SetIndent("", "\t")
|
|
return enc.Encode(pr)
|
|
}
|