package handler import ( "archive/tar" "bytes" "compress/gzip" "context" "encoding/json" "errors" "io" "net/http" "path/filepath" "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/valyala/fasthttp" "go.uber.org/zap" ) const ( jsonHeader = "application/json; charset=UTF-8" drainBufSize = 4096 explodeArchiveHeader = "X-Explode-Archive" ) type putResponse struct { ObjectID string `json:"object_id"` ContainerID string `json:"container_id"` } func newPutResponse(addr oid.Address) *putResponse { return &putResponse{ ObjectID: addr.Object().EncodeToString(), ContainerID: addr.Container().EncodeToString(), } } func (pr *putResponse) encode(w io.Writer) error { enc := json.NewEncoder(w) enc.SetIndent("", "\t") return enc.Encode(pr) } // Upload handles multipart upload request. func (h *Handler) Upload(c *fasthttp.RequestCtx) { var file MultipartFile scid, _ := c.UserValue("cid").(string) bodyStream := c.RequestBodyStream() drainBuf := make([]byte, drainBufSize) ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) log := reqLog.With(zap.String("cid", scid)) bktInfo, err := h.getBucketInfo(ctx, scid, log) if err != nil { logAndSendBucketError(c, log, err) return } boundary := string(c.Request.Header.MultipartFormBoundary()) if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } filtered, err := filterHeaders(log, &c.Request.Header) if err != nil { log.Error(logs.FailedToFilterHeaders, zap.Error(err)) ResponseError(c, err.Error(), fasthttp.StatusBadRequest) return } if c.Request.Header.Peek(explodeArchiveHeader) != nil { h.explodeArchive(request{c, log}, bktInfo, file, filtered) } else { h.uploadSingleObject(request{c, log}, bktInfo, file, filtered) } // Multipart is multipart and thus can contain more than one part which // we ignore at the moment. Also, when dealing with chunked encoding // the last zero-length chunk might be left unread (because multipart // reader only cares about its boundary and doesn't look further) and // it will be (erroneously) interpreted as the start of the next // pipelined header. Thus, we need to drain the body buffer. for { _, err = bodyStream.Read(drainBuf) if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { break } } } func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) { c, log := req.RequestCtx, req.log setIfNotExist(filtered, object.AttributeFileName, file.FileName()) attributes, err := h.extractAttributes(c, log, filtered) if err != nil { log.Error(logs.FailedToGetAttributes, zap.Error(err)) ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest) return } idObj, err := h.uploadObject(c, bkt, attributes, file) if err != nil { h.handlePutFrostFSErr(c, err, log) return } log.Debug(logs.ObjectUploaded, zap.String("oid", idObj.EncodeToString()), zap.String("FileName", file.FileName()), ) addr := newAddress(bkt.CID, idObj) c.Response.Header.SetContentType(jsonHeader) // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { log.Error(logs.CouldNotEncodeResponse, zap.Error(err)) ResponseError(c, "could not encode response", fasthttp.StatusBadRequest) return } } func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) { ctx := utils.GetContextFromRequest(c) obj := object.New() obj.SetContainerID(bkt.CID) obj.SetOwnerID(*h.ownerID) obj.SetAttributes(attrs...) prm := PrmObjectCreate{ PrmAuth: PrmAuth{ BearerToken: h.fetchBearerToken(ctx), }, Object: obj, Payload: file, ClientCut: h.config.ClientCut(), WithoutHomomorphicHash: bkt.HomomorphicHashDisabled, BufferMaxSize: h.config.BufferMaxSizeForPut(), } idObj, err := h.frostfs.CreateObject(ctx, prm) if err != nil { return oid.ID{}, err } return idObj, nil } func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) { now := time.Now() if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err)) } else { now = parsed } } if err := utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil { log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) return nil, err } attributes := make([]object.Attribute, 0, len(filtered)) // prepares attributes from filtered headers for key, val := range filtered { attribute := newAttribute(key, val) attributes = append(attributes, attribute) } // sets Timestamp attribute if it wasn't set from header and enabled by settings if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() { timestamp := newAttribute(object.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)) attributes = append(attributes, timestamp) } return attributes, nil } func newAttribute(key string, val string) object.Attribute { attr := object.NewAttribute() attr.SetKey(key) attr.SetValue(val) return *attr } // explodeArchive read files from archive and creates objects for each of them. // Sets FilePath attribute with name from tar.Header. func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) { c, log := req.RequestCtx, req.log // remove user attributes which vary for each file in archive // to guarantee that they won't appear twice delete(filtered, object.AttributeFileName) delete(filtered, object.AttributeFilePath) commonAttributes, err := h.extractAttributes(c, log, filtered) if err != nil { log.Error(logs.FailedToGetAttributes, zap.Error(err)) ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest) return } attributes := commonAttributes reader := file if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) { log.Debug(logs.GzipReaderSelected) gzipReader, err := gzip.NewReader(file) if err != nil { log.Error(logs.FailedToCreateGzipReader, zap.Error(err)) ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest) return } defer func() { if err := gzipReader.Close(); err != nil { log.Warn(logs.FailedToCloseReader, zap.Error(err)) } }() reader = gzipReader } tarReader := tar.NewReader(reader) for { obj, err := tarReader.Next() if errors.Is(err, io.EOF) { break } else if err != nil { log.Error(logs.FailedToReadFileFromTar, zap.Error(err)) ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest) return } if isDir(obj.Name) { continue } // set varying attributes attributes = attributes[:len(commonAttributes)] fileName := filepath.Base(obj.Name) attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name)) attributes = append(attributes, newAttribute(object.AttributeFileName, fileName)) idObj, err := h.uploadObject(c, bkt, attributes, tarReader) if err != nil { h.handlePutFrostFSErr(c, err, log) return } log.Debug(logs.ObjectUploaded, zap.String("oid", idObj.EncodeToString()), zap.String("FileName", fileName), ) } } func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) { statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err) logFields := append([]zap.Field{zap.Error(err)}, additionalFields...) log.Error(logs.CouldNotStoreFileInFrostfs, logFields...) ResponseError(r, msg, statusCode) } func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { return tkn } return nil }