[#60] Refactor start of periodic XML writer

Reduce code duplication for error handling

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2023-03-22 10:52:23 +03:00
parent 8151753eeb
commit 5104683f68

View file

@ -2,6 +2,7 @@ package handler
import ( import (
"encoding/xml" "encoding/xml"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@ -405,11 +406,46 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
// back to the client the same way as Amazon S3 service does. // back to the client the same way as Amazon S3 service does.
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive) stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive)
// Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part.
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo, sessionTokenSetEACL)
// Stop periodic writer as complete multipart upload is finished
// successfully or not.
headerIsWritten := stopPeriodicResponseWriter()
responseWriter := api.EncodeToResponse
errLogger := h.logAndSendError
// Do not send XML and HTTP headers if periodic writer was invoked at this point.
if headerIsWritten {
responseWriter = api.EncodeToResponseNoHeader
errLogger = h.logAndSendErrorNoHeader
}
if err != nil {
errLogger(w, "complete multipart error", reqInfo, err, additional...)
return
}
response := CompleteMultipartUploadResponse{
Bucket: objInfo.Bucket,
ETag: objInfo.HashSum,
Key: objInfo.Name,
}
// Here we previously set api.AmzVersionID header for versioned bucket.
// It is not possible after #60, because of periodic white
// space XML writer to keep connection with the client.
if err = responseWriter(w, response); err != nil {
errLogger(w, "something went wrong", reqInfo, err)
}
}
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *api.ReqInfo, stoken *session.Container) (*data.ObjectInfo, error) {
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
if err != nil { if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("could not complete multipart upload: %w", err)
logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...)
return
} }
objInfo := extendedObjInfo.ObjectInfo objInfo := extendedObjInfo.ObjectInfo
@ -424,24 +460,18 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
NodeVersion: extendedObjInfo.NodeVersion, NodeVersion: extendedObjInfo.NodeVersion,
} }
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
return
} }
} }
if len(uploadData.ACLHeaders) != 0 { if len(uploadData.ACLHeaders) != 0 {
key, err := h.bearerTokenIssuerKey(r.Context()) key, err := h.bearerTokenIssuerKey(r.Context())
if err != nil { if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("couldn't get gate key: %w", err)
logAndSendError(w, "couldn't get gate key", reqInfo, err)
return
} }
acl, err := parseACLHeaders(r.Header, key) acl, err := parseACLHeaders(r.Header, key)
if err != nil { if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("could not parse acl: %w", err)
logAndSendError(w, "could not parse acl", reqInfo, err)
return
} }
resInfo := &resourceInfo{ resInfo := &resourceInfo{
@ -450,14 +480,10 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
astObject, err := aclToAst(acl, resInfo) astObject, err := aclToAst(acl, resInfo)
if err != nil { if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("could not translate acl of completed multipart upload to ast: %w", err)
logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...)
return
} }
if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil { if _, err = h.updateBucketACL(r, astObject, bktInfo, stoken); err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) return nil, fmt.Errorf("could not update bucket acl while completing multipart upload: %w", err)
logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...)
return
} }
} }
@ -471,26 +497,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
h.log.Error("couldn't send notification: %w", zap.Error(err)) h.log.Error("couldn't send notification: %w", zap.Error(err))
} }
response := CompleteMultipartUploadResponse{ return objInfo, nil
Bucket: objInfo.Bucket,
ETag: objInfo.HashSum,
Key: objInfo.Name,
}
// Here we previously set api.AmzVersionID header for versioned bucket.
// It is not possible after #60, because we introduced periodic white
// space XML writer to keep connection with the client.
headerIsWritten := stopPeriodicResponseWriter()
if headerIsWritten {
if err = api.EncodeToResponseNoHeader(w, response); err != nil {
h.logAndSendErrorNoHeader(w, "something went wrong", reqInfo, err)
}
} else {
if err = api.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
}
} }
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) { func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {