package handler import ( "archive/tar" "bytes" "compress/gzip" "context" "encoding/json" "errors" "io" "net/http" "path/filepath" "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/valyala/fasthttp" "go.uber.org/zap" ) const ( jsonHeader = "application/json; charset=UTF-8" drainBufSize = 4096 explodeArchiveHeader = "X-Explode-Archive" ) type putResponse struct { ObjectID string `json:"object_id"` ContainerID string `json:"container_id"` } func newPutResponse(addr oid.Address) *putResponse { return &putResponse{ ObjectID: addr.Object().EncodeToString(), ContainerID: addr.Container().EncodeToString(), } } func (pr *putResponse) encode(w io.Writer) error { enc := json.NewEncoder(w) enc.SetIndent("", "\t") return enc.Encode(pr) } // Upload handles multipart upload request. func (h *Handler) Upload(req *fasthttp.RequestCtx) { ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload") defer span.End() var file MultipartFile scid, _ := req.UserValue("cid").(string) bodyStream := req.RequestBodyStream() drainBuf := make([]byte, drainBufSize) log := h.reqLogger(ctx) ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid))) bktInfo, err := h.getBucketInfo(ctx, scid) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err) return } boundary := string(req.Request.Header.MultipartFormBoundary()) if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err) return } filtered, err := filterHeaders(log, &req.Request.Header) if err != nil { h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err) return } if req.Request.Header.Peek(explodeArchiveHeader) != nil { h.explodeArchive(ctx, req, bktInfo, file, filtered) } else { h.uploadSingleObject(ctx, req, bktInfo, file, filtered) } // Multipart is multipart and thus can contain more than one part which // we ignore at the moment. Also, when dealing with chunked encoding // the last zero-length chunk might be left unread (because multipart // reader only cares about its boundary and doesn't look further) and // it will be (erroneously) interpreted as the start of the next // pipelined header. Thus, we need to drain the body buffer. for { _, err = bodyStream.Read(drainBuf) if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { break } } } func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) { ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject") defer span.End() setIfNotExist(filtered, object.AttributeFileName, file.FileName()) attributes, err := h.extractAttributes(ctx, req, filtered) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err) return } idObj, err := h.uploadObject(ctx, bkt, attributes, file) if err != nil { h.logAndSendError(ctx, req, logs.FailedToUploadObject, err) return } h.reqLogger(ctx).Debug(logs.ObjectUploaded, zap.String("oid", idObj.EncodeToString()), zap.String("FileName", file.FileName()), logs.TagField(logs.TagExternalStorage), ) addr := newAddress(bkt.CID, idObj) req.Response.Header.SetContentType(jsonHeader) // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(req); err != nil { h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err) return } } func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) { obj := object.New() obj.SetContainerID(bkt.CID) obj.SetOwnerID(*h.ownerID) obj.SetAttributes(attrs...) prm := PrmObjectCreate{ PrmAuth: PrmAuth{ BearerToken: h.fetchBearerToken(ctx), }, Object: obj, Payload: file, ClientCut: h.config.ClientCut(), WithoutHomomorphicHash: bkt.HomomorphicHashDisabled, BufferMaxSize: h.config.BufferMaxSizeForPut(), } idObj, err := h.frostfs.CreateObject(ctx, prm) if err != nil { return oid.ID{}, err } return idObj, nil } func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) { now := time.Now() if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err), logs.TagField(logs.TagDatapath)) } else { now = parsed } } if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil { h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath)) return nil, err } attributes := make([]object.Attribute, 0, len(filtered)) // prepares attributes from filtered headers for key, val := range filtered { attribute := newAttribute(key, val) attributes = append(attributes, attribute) } // sets Timestamp attribute if it wasn't set from header and enabled by settings if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() { timestamp := newAttribute(object.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)) attributes = append(attributes, timestamp) } return attributes, nil } func newAttribute(key string, val string) object.Attribute { attr := object.NewAttribute() attr.SetKey(key) attr.SetValue(val) return *attr } // explodeArchive read files from archive and creates objects for each of them. // Sets FilePath attribute with name from tar.Header. func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) { ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive") defer span.End() // remove user attributes which vary for each file in archive // to guarantee that they won't appear twice delete(filtered, object.AttributeFileName) delete(filtered, object.AttributeFilePath) commonAttributes, err := h.extractAttributes(ctx, req, filtered) if err != nil { h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err) return } attributes := commonAttributes reader := file if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) { h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath)) gzipReader, err := gzip.NewReader(file) if err != nil { h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err) return } defer func() { if err := gzipReader.Close(); err != nil { h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath)) } }() reader = gzipReader } tarReader := tar.NewReader(reader) for { obj, err := tarReader.Next() if errors.Is(err, io.EOF) { break } else if err != nil { h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err) return } if isDir(obj.Name) { continue } // set varying attributes attributes = attributes[:len(commonAttributes)] fileName := filepath.Base(obj.Name) attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name)) attributes = append(attributes, newAttribute(object.AttributeFileName, fileName)) idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader) if err != nil { h.logAndSendError(ctx, req, logs.FailedToUploadObject, err) return } h.reqLogger(ctx).Debug(logs.ObjectUploaded, zap.String("oid", idObj.EncodeToString()), zap.String("FileName", fileName), logs.TagField(logs.TagExternalStorage), ) } } func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { return tkn } return nil }