diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index e33de97..e5b9022 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -2,6 +2,7 @@ package handler import ( "encoding/xml" + "fmt" "io" "net/http" "net/url" @@ -405,11 +406,46 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. // back to the client the same way as Amazon S3 service does. stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive) + // Start complete multipart upload which may take some time to fetch object + // and re-upload it part by part. + objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo, sessionTokenSetEACL) + + // Stop periodic writer as complete multipart upload is finished + // successfully or not. + headerIsWritten := stopPeriodicResponseWriter() + + responseWriter := api.EncodeToResponse + errLogger := h.logAndSendError + // Do not send XML and HTTP headers if periodic writer was invoked at this point. + if headerIsWritten { + responseWriter = api.EncodeToResponseNoHeader + errLogger = h.logAndSendErrorNoHeader + } + + if err != nil { + errLogger(w, "complete multipart error", reqInfo, err, additional...) + return + } + + response := CompleteMultipartUploadResponse{ + Bucket: objInfo.Bucket, + ETag: objInfo.HashSum, + Key: objInfo.Name, + } + + // Here we previously set api.AmzVersionID header for versioned bucket. + // It is not possible after #60, because of periodic white + // space XML writer to keep connection with the client. + + if err = responseWriter(w, response); err != nil { + errLogger(w, "something went wrong", reqInfo, err) + } +} + +func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *api.ReqInfo, stoken *session.Container) (*data.ObjectInfo, error) { uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) if err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) - return + return nil, fmt.Errorf("could not complete multipart upload: %w", err) } objInfo := extendedObjInfo.ObjectInfo @@ -424,24 +460,18 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. NodeVersion: extendedObjInfo.NodeVersion, } if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...) - return + return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err) } } if len(uploadData.ACLHeaders) != 0 { key, err := h.bearerTokenIssuerKey(r.Context()) if err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "couldn't get gate key", reqInfo, err) - return + return nil, fmt.Errorf("couldn't get gate key: %w", err) } acl, err := parseACLHeaders(r.Header, key) if err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "could not parse acl", reqInfo, err) - return + return nil, fmt.Errorf("could not parse acl: %w", err) } resInfo := &resourceInfo{ @@ -450,14 +480,10 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } astObject, err := aclToAst(acl, resInfo) if err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) - return + return nil, fmt.Errorf("could not translate acl of completed multipart upload to ast: %w", err) } - if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil { - logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) - logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...) - return + if _, err = h.updateBucketACL(r, astObject, bktInfo, stoken); err != nil { + return nil, fmt.Errorf("could not update bucket acl while completing multipart upload: %w", err) } } @@ -471,26 +497,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. h.log.Error("couldn't send notification: %w", zap.Error(err)) } - response := CompleteMultipartUploadResponse{ - Bucket: objInfo.Bucket, - ETag: objInfo.HashSum, - Key: objInfo.Name, - } - - // Here we previously set api.AmzVersionID header for versioned bucket. - // It is not possible after #60, because we introduced periodic white - // space XML writer to keep connection with the client. - - headerIsWritten := stopPeriodicResponseWriter() - if headerIsWritten { - if err = api.EncodeToResponseNoHeader(w, response); err != nil { - h.logAndSendErrorNoHeader(w, "something went wrong", reqInfo, err) - } - } else { - if err = api.EncodeToResponse(w, response); err != nil { - h.logAndSendError(w, "something went wrong", reqInfo, err) - } - } + return objInfo, nil } func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {