[#170] tar.gz extraction during upload #176

Closed
nzinkevich wants to merge 1 commit from nzinkevich/frostfs-http-gw:zip_explode into master
7 changed files with 149 additions and 59 deletions
Showing only changes of commit 6e5189d6a9 - Show all commits

View file

@ -36,7 +36,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
} }
} }
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
return &request{ return &request{
RequestCtx: ctx, RequestCtx: ctx,
log: log, log: log,

View file

@ -39,7 +39,7 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]st
// check if key gets duplicated // check if key gets duplicated
// return error containing full key name (with prefix) // return error containing full key name (with prefix)
if _, ok := result[string(clearKey)]; ok { if _, ok := result[string(clearKey)]; ok {
err = fmt.Errorf("key duplication error: %s", string(key)) err = fmt.Errorf("header key duplication error: %s", string(key))
return return
} }

View file

@ -215,7 +215,7 @@ func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context
addr := newAddress(bktInfo.CID, *objID) addr := newAddress(bktInfo.CID, *objID)
f(ctx, *h.newRequest(c, log), addr) f(ctx, *newRequest(c, log), addr)
} }
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that // byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
@ -257,7 +257,7 @@ func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, reque
} }
addr := newAddress(bktInfo.CID, foundOid.OID) addr := newAddress(bktInfo.CID, foundOid.OID)
f(ctx, *h.newRequest(c, log), addr) f(ctx, *newRequest(c, log), addr)
} }
// byAttribute is a wrapper similar to byNativeAddress. // byAttribute is a wrapper similar to byNativeAddress.
@ -319,7 +319,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
addrObj.SetContainer(bktInfo.CID) addrObj.SetContainer(bktInfo.CID)
addrObj.SetObject(buf[0]) addrObj.SetObject(buf[0])
f(ctx, *h.newRequest(c, log), addrObj) f(ctx, *newRequest(c, log), addrObj)
} }
// resolveContainer decode container id, if it's not a valid container id // resolveContainer decode container id, if it's not a valid container id

View file

@ -42,7 +42,9 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
// ignore multipart/form-data values // ignore multipart/form-data values
if filename == "" { if filename == "" {
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name)) l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
if err = part.Close(); err != nil {
l.Warn(logs.FailedToCloseReader, zap.Error(err))
}
continue continue
} }

View file

@ -1,13 +1,19 @@
package handler package handler
import ( import (
"archive/tar"
"bytes"
"compress/gzip"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"net/http" "net/http"
"path/filepath"
"strconv" "strconv"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
@ -22,8 +28,35 @@ import (
const ( const (
jsonHeader = "application/json; charset=UTF-8" jsonHeader = "application/json; charset=UTF-8"
drainBufSize = 4096 drainBufSize = 4096
dkirillov marked this conversation as resolved Outdated

Header should be X-Expode-Archive

Header should be `X-Expode-Archive`
explodeArchiveHeader = "X-Explode-Archive"
) )
type accumulatedReader struct {
reader io.Reader
buffer *bytes.Buffer
}
func newAccumulatedReader(rc io.Reader) *accumulatedReader {
return &accumulatedReader{
reader: rc,
buffer: &bytes.Buffer{},
}
}
// Read reads data from the underlying io.Reader and accumulates it into the buffer.
func (r *accumulatedReader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
if n > 0 {
r.buffer.Write(p[:n])
}
return n, err
}
func (r *accumulatedReader) Restore() {
r.reader = io.MultiReader(r.buffer, r.reader)
r.buffer = &bytes.Buffer{}
}
type putResponse struct { type putResponse struct {
ObjectID string `json:"object_id"` ObjectID string `json:"object_id"`
ContainerID string `json:"container_id"` ContainerID string `json:"container_id"`
@ -44,11 +77,7 @@ func (pr *putResponse) encode(w io.Writer) error {
dkirillov marked this conversation as resolved Outdated

Actually, as I remembered this log was aimed to print address of uploaded objects. It's better to keep such behavior. Maybe using some different approach but still

Actually, as I remembered this log was aimed to print address of uploaded objects. It's better to keep such behavior. Maybe using some different approach but still

Also, I don't think we should add attribute Explode-Archive to result objects

c.Request.Header.Peek(utils.UserAttributeHeaderPrefix + explodeArchiveHeader)
Also, I don't think we should add attribute `Explode-Archive` to result objects ```golang c.Request.Header.Peek(utils.UserAttributeHeaderPrefix + explodeArchiveHeader) ```
// Upload handles multipart upload request. // Upload handles multipart upload request.
func (h *Handler) Upload(c *fasthttp.RequestCtx) { func (h *Handler) Upload(c *fasthttp.RequestCtx) {
var ( var file MultipartFile
file MultipartFile
idObj oid.ID
addr oid.Address
)
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
bodyStream := c.RequestBodyStream() bodyStream := c.RequestBodyStream()
@ -64,32 +93,62 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
return return
} }
defer func() {
// If the temporary reader can be closed - let's close it.
if file == nil {
return
}
err := file.Close()
log.Debug(
logs.CloseTemporaryMultipartFormFile,
zap.Stringer("address", addr),
zap.String("filename", file.FileName()),
zap.Error(err),
)
}()
boundary := string(c.Request.Header.MultipartFormBoundary()) boundary := string(c.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
defer func() {
if err := file.Close(); err != nil {
log.Warn(logs.FailedToCloseTemporaryMultipartFormFile, zap.Error(err))
}
}()
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
h.explodeArchive(c, log, bktInfo, file)
} else {
h.uploadSingleObject(c, log, bktInfo, file)
}
// Multipart is multipart and thus can contain more than one part which
// we ignore at the moment. Also, when dealing with chunked encoding
// the last zero-length chunk might be left unread (because multipart
// reader only cares about its boundary and doesn't look further) and
// it will be (erroneously) interpreted as the start of the next
// pipelined header. Thus we need to drain the body buffer.
for {
_, err = bodyStream.Read(drainBuf)
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
break
}
dkirillov marked this conversation as resolved Outdated

Despite this won't affect behavior, can we set header before setting body?

Despite this won't affect behavior, can we set header before setting body?
}
}
func (h *Handler) uploadSingleObject(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, file MultipartFile) {
idObj, err := h.uploadObject(c, log, bktInfo, file.FileName(), file)
if err != nil {
log.Error(logs.FailedToUploadObject, zap.Error(err))
return
}
addr := newAddress(bktInfo.CID, idObj)
c.Response.Header.SetContentType(jsonHeader)
// Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(c); err != nil {
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
return
}
}
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, fileName string, file io.Reader) (oid.ID, error) {
ctx := utils.GetContextFromRequest(c)
filtered, err := filterHeaders(log, &c.Request.Header) filtered, err := filterHeaders(log, &c.Request.Header)
if err != nil { if err != nil {
log.Error(logs.CouldNotProcessHeaders, zap.Error(err)) log.Error(logs.FailedToFilterHeaders, zap.Error(err))
response.Error(c, err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not filter headers", fasthttp.StatusBadRequest)
return return oid.ID{}, err
} }
now := time.Now() now := time.Now()
@ -104,7 +163,7 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil { if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
return return oid.ID{}, err
} }
attributes := make([]object.Attribute, 0, len(filtered)) attributes := make([]object.Attribute, 0, len(filtered))
@ -117,10 +176,10 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
} }
// sets FileName attribute if it wasn't set from header // sets FileName attribute if it wasn't set from header
if _, ok := filtered[object.AttributeFileName]; !ok { if _, ok := filtered[object.AttributeFileName]; !ok {
filename := object.NewAttribute() fileNameAttr := object.NewAttribute()
filename.SetKey(object.AttributeFileName) fileNameAttr.SetKey(object.AttributeFileName)
filename.SetValue(file.FileName()) fileNameAttr.SetValue(fileName)
attributes = append(attributes, *filename) attributes = append(attributes, *fileNameAttr)
} }
// sets Timestamp attribute if it wasn't set from header and enabled by settings // sets Timestamp attribute if it wasn't set from header and enabled by settings
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() { if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() {
@ -146,36 +205,59 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
BufferMaxSize: h.config.BufferMaxSizeForPut(), BufferMaxSize: h.config.BufferMaxSizeForPut(),
} }
var idObj oid.ID
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil { if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
h.handlePutFrostFSErr(c, err, log) h.handlePutFrostFSErr(c, err, log)
return return oid.ID{}, err
}
log.Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", fileName),
)
return idObj, nil
}
// explodeArchive read files from archive and creates objects for each of them.
// Sets FilePath attribute with name from tar.Header.
func (h *Handler) explodeArchive(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, formFile io.ReadCloser) {
// default reader - without gzip decompression
accReader := newAccumulatedReader(formFile)
var reader io.Reader
if gzipReader, err := gzip.NewReader(accReader); err == nil {
reader = gzipReader
defer func() {
if err := gzipReader.Close(); err != nil {
log.Warn(logs.FailedToCloseReader, zap.Error(err))
}
dkirillov marked this conversation as resolved Outdated

Let's log also object name

Let's log also object name
}()
} else {
log.Info(logs.CompressionCheckFailed, zap.Error(err))
accReader.Restore()
reader = accReader
} }
addr.SetObject(idObj) tarReader := tar.NewReader(reader)
addr.SetContainer(bktInfo.CID)
// Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(c); err != nil {
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
return
}
// Multipart is multipart and thus can contain more than one part which
// we ignore at the moment. Also, when dealing with chunked encoding
// the last zero-length chunk might be left unread (because multipart
// reader only cares about its boundary and doesn't look further) and
// it will be (erroneously) interpreted as the start of the next
// pipelined header. Thus we need to drain the body buffer.
for { for {
_, err = bodyStream.Read(drainBuf) obj, err := tarReader.Next()
if err == io.EOF || err == io.ErrUnexpectedEOF { if errors.Is(err, io.EOF) {
break break
} else if err != nil {
log.Error(logs.FailedToReadFileFromTar, zap.Error(err))
response.Error(c, "could not read tar header: "+err.Error(), fasthttp.StatusBadRequest)
return
}
if isDir(obj.Name) {
continue
}
c.Request.Header.Set(utils.UserAttributeHeaderPrefix+object.AttributeFilePath, obj.Name)
_, err = h.uploadObject(c, log, bktInfo, filepath.Base(obj.Name), tarReader)
if err != nil {
log.Error(logs.FailedToUploadObject, zap.Error(err))
response.Error(c, "could not upload object: "+err.Error(), fasthttp.StatusBadRequest)
return
} }
} }
// Report status code and content type.
c.Response.SetStatusCode(fasthttp.StatusOK)
c.Response.Header.SetContentType(jsonHeader)
} }
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) { func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {

View file

@ -0,0 +1 @@
package handler

View file

@ -22,9 +22,8 @@ const (
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go FailedToCloseTemporaryMultipartFormFile = "failed to close temporary multipart/form file" // Warn in ../../uploader/upload.go
CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go
CouldNotProcessHeaders = "could not process headers" // Error in ../../uploader/upload.go
CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go
CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go
CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go
@ -87,4 +86,10 @@ const (
MultinetDialFail = "multinet dial failed" MultinetDialFail = "multinet dial failed"
FailedToLoadMultinetConfig = "failed to load multinet config" FailedToLoadMultinetConfig = "failed to load multinet config"
MultinetConfigWontBeUpdated = "multinet config won't be updated" MultinetConfigWontBeUpdated = "multinet config won't be updated"
FailedToFilterHeaders = "failed to filter headers"
FailedToReadFileFromTar = "failed to read file from tar"
FailedToUploadObject = "failed to upload object"
ObjectUploaded = "object uploaded"
FailedToCloseReader = "failed to close reader"
CompressionCheckFailed = "compression check failed"
) )