package handler import ( "archive/tar" "compress/gzip" "context" "encoding/json" "errors" "io" "net/http" "path/filepath" "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/valyala/fasthttp" "go.uber.org/zap" ) const ( jsonHeader = "application/json; charset=UTF-8" drainBufSize = 4096 explodeArchiveHeader = "Explode-Archive" ) type putResponse struct { ObjectID string `json:"object_id"` ContainerID string `json:"container_id"` } func newPutResponse(addr oid.Address) *putResponse { return &putResponse{ ObjectID: addr.Object().EncodeToString(), ContainerID: addr.Container().EncodeToString(), } } func (pr *putResponse) encode(w io.Writer) error { enc := json.NewEncoder(w) enc.SetIndent("", "\t") return enc.Encode(pr) } // Upload handles multipart upload request. func (h *Handler) Upload(c *fasthttp.RequestCtx) { var file MultipartFile scid, _ := c.UserValue("cid").(string) bodyStream := c.RequestBodyStream() drainBuf := make([]byte, drainBufSize) ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) log := reqLog.With(zap.String("cid", scid)) bktInfo, err := h.getBucketInfo(ctx, scid, log) if err != nil { logAndSendBucketError(c, log, err) return } defer func() { // If the temporary reader can be closed - let's close it. if file == nil { return } err := file.Close() log.Debug( logs.CloseTemporaryMultipartFormFile, zap.Stringer("container", bktInfo.CID), zap.String("filename", file.FileName()), zap.Error(err), ) }() boundary := string(c.Request.Header.MultipartFormBoundary()) if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } if header := c.Request.Header.Peek(utils.UserAttributeHeaderPrefix + explodeArchiveHeader); header != nil { h.explodeGzip(c, log, bktInfo, file) } else { h.uploadSingleObject(c, log, bktInfo, file) } // Multipart is multipart and thus can contain more than one part which // we ignore at the moment. Also, when dealing with chunked encoding // the last zero-length chunk might be left unread (because multipart // reader only cares about its boundary and doesn't look further) and // it will be (erroneously) interpreted as the start of the next // pipelined header. Thus we need to drain the body buffer. for { _, err = bodyStream.Read(drainBuf) if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { break } } } func (h *Handler) uploadSingleObject(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, file MultipartFile) { idObj, err := h.uploadObject(c, log, bktInfo, file.FileName(), file) if err != nil { log.Error(logs.FailedToUploadObject, zap.Error(err)) return } addr := newAddress(bktInfo.CID, idObj) // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { log.Error(logs.CouldNotEncodeResponse, zap.Error(err)) response.Error(c, "could not encode response", fasthttp.StatusBadRequest) return } c.Response.Header.SetContentType(jsonHeader) } func (h *Handler) uploadObject(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, fileName string, file io.Reader) (oid.ID, error) { ctx := utils.GetContextFromRequest(c) filtered, err := filterHeaders(log, &c.Request.Header) if err != nil { log.Error(logs.FailedToFilterHeaders, zap.Error(err)) response.Error(c, "could not filter headers", fasthttp.StatusBadRequest) return oid.ID{}, err } now := time.Now() if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err)) } else { now = parsed } } if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil { log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) return oid.ID{}, err } attributes := make([]object.Attribute, 0, len(filtered)) // prepares attributes from filtered headers for key, val := range filtered { attribute := object.NewAttribute() attribute.SetKey(key) attribute.SetValue(val) attributes = append(attributes, *attribute) } // sets FileName attribute if it wasn't set from header if _, ok := filtered[object.AttributeFileName]; !ok { fileNameAttr := object.NewAttribute() fileNameAttr.SetKey(object.AttributeFileName) fileNameAttr.SetValue(fileName) attributes = append(attributes, *fileNameAttr) } // sets Timestamp attribute if it wasn't set from header and enabled by settings if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() { timestamp := object.NewAttribute() timestamp.SetKey(object.AttributeTimestamp) timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) attributes = append(attributes, *timestamp) } obj := object.New() obj.SetContainerID(bktInfo.CID) obj.SetOwnerID(*h.ownerID) obj.SetAttributes(attributes...) prm := PrmObjectCreate{ PrmAuth: PrmAuth{ BearerToken: h.fetchBearerToken(ctx), }, Object: obj, Payload: file, ClientCut: h.config.ClientCut(), WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled, BufferMaxSize: h.config.BufferMaxSizeForPut(), } var idObj oid.ID if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil { h.handlePutFrostFSErr(c, err, log) return oid.ID{}, err } return idObj, nil } // explodeGzip read files from tar.gz archive and creates objects for each of them. // Sets FilePath attribute with name from tar.Header. func (h *Handler) explodeGzip(c *fasthttp.RequestCtx, log *zap.Logger, bktInfo *data.BucketInfo, file io.Reader) { gzipReader, err := gzip.NewReader(file) if err != nil { log.Error(logs.FailedToCreateReader, zap.Error(err)) response.Error(c, "could not create gzip reader: "+err.Error(), fasthttp.StatusBadRequest) return } defer func() { if err := gzipReader.Close(); err != nil { log.Error(logs.FailedToCloseReader, zap.Error(err)) } }() tarReader := tar.NewReader(gzipReader) var obj *tar.Header for { obj, err = tarReader.Next() if errors.Is(err, io.EOF) { break } else if err != nil { log.Error(logs.FailedToReadFileFromTar, zap.Error(err)) continue } if isDir(obj.Name) { continue } c.Request.Header.Set(utils.UserAttributeHeaderPrefix+object.AttributeFilePath, obj.Name) idObj, err := h.uploadObject(c, log, bktInfo, filepath.Base(obj.Name), tarReader) if err != nil { log.Error(logs.FailedToUploadObject, zap.Error(err)) response.Error(c, "could not upload object: "+err.Error(), fasthttp.StatusBadRequest) } log.Debug(logs.ObjectUploaded, zap.String("object ID", idObj.EncodeToString())) } } func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) { statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err) logFields := append([]zap.Field{zap.Error(err)}, additionalFields...) log.Error(logs.CouldNotStoreFileInFrostfs, logFields...) response.Error(r, msg, statusCode) } func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { return tkn } return nil }