From 8151753eeb17aecba7df1d79b991aeb52c9a4cd6 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 20 Mar 2023 11:44:35 +0300 Subject: [PATCH] [#60] Use periodic white space XML writer in Complete Multipart Upload This mechanism is used by Amazon S3 to keep client's connection alive while object is being constructed from the upload parts. Signed-off-by: Alex Vanin --- api/handler/api.go | 15 ++++----- api/handler/multipart_upload.go | 54 +++++++++++++++++++++++---------- cmd/s3-gw/app.go | 2 ++ cmd/s3-gw/app_settings.go | 2 ++ config/config.env | 2 ++ config/config.yaml | 2 ++ docs/configuration.md | 8 +++-- 7 files changed, 59 insertions(+), 26 deletions(-) diff --git a/api/handler/api.go b/api/handler/api.go index 45b4b3949..f16e1ca8a 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -27,13 +27,14 @@ type ( // Config contains data which handler needs to keep. Config struct { - Policy PlacementPolicy - XMLDecoder XMLDecoderProvider - DefaultMaxAge int - NotificatorEnabled bool - CopiesNumber uint32 - ResolveZoneList []string - IsResolveListAllow bool // True if ResolveZoneList contains allowed zones + Policy PlacementPolicy + XMLDecoder XMLDecoderProvider + DefaultMaxAge int + NotificatorEnabled bool + CopiesNumber uint32 + ResolveZoneList []string + IsResolveListAllow bool // True if ResolveZoneList contains allowed zones + CompleteMultipartKeepalive time.Duration } PlacementPolicy interface { diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 258aabfe0..e33de971c 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -400,9 +400,15 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. Parts: reqBody.Parts, } + // Next operations might take some time, so we want to keep client's + // connection alive. To do so, gateway sends periodic white spaces + // back to the client the same way as Amazon S3 service does. + stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive) + uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) if err != nil { - h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) return } objInfo := extendedObjInfo.ObjectInfo @@ -418,7 +424,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. NodeVersion: extendedObjInfo.NodeVersion, } if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { - h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...) return } } @@ -426,12 +433,14 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. if len(uploadData.ACLHeaders) != 0 { key, err := h.bearerTokenIssuerKey(r.Context()) if err != nil { - h.logAndSendError(w, "couldn't get gate key", reqInfo, err) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "couldn't get gate key", reqInfo, err) return } acl, err := parseACLHeaders(r.Header, key) if err != nil { - h.logAndSendError(w, "could not parse acl", reqInfo, err) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not parse acl", reqInfo, err) return } @@ -441,11 +450,13 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } astObject, err := aclToAst(acl, resInfo) if err != nil { - h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) return } if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil { - h.logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...) return } } @@ -460,23 +471,25 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. h.log.Error("couldn't send notification: %w", zap.Error(err)) } - bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) - if err != nil { - h.logAndSendError(w, "could not get bucket settings", reqInfo, err) - } - response := CompleteMultipartUploadResponse{ Bucket: objInfo.Bucket, ETag: objInfo.HashSum, Key: objInfo.Name, } - if bktSettings.VersioningEnabled() { - w.Header().Set(api.AmzVersionID, objInfo.VersionID()) - } + // Here we previously set api.AmzVersionID header for versioned bucket. + // It is not possible after #60, because we introduced periodic white + // space XML writer to keep connection with the client. - if err = api.EncodeToResponse(w, response); err != nil { - h.logAndSendError(w, "something went wrong", reqInfo, err) + headerIsWritten := stopPeriodicResponseWriter() + if headerIsWritten { + if err = api.EncodeToResponseNoHeader(w, response); err != nil { + h.logAndSendErrorNoHeader(w, "something went wrong", reqInfo, err) + } + } else { + if err = api.EncodeToResponse(w, response); err != nil { + h.logAndSendError(w, "something went wrong", reqInfo, err) + } } } @@ -732,3 +745,12 @@ func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) { return stop } + +// periodicWriterErrorSender returns handler function to send error. If header is +// alreay written by periodic XML writer, do not send HTTP and XML headers. +func (h *handler) periodicWriterErrorSender(headerWritten bool) func(http.ResponseWriter, string, *api.ReqInfo, error, ...zap.Field) { + if headerWritten { + return h.logAndSendErrorNoHeader + } + return h.logAndSendError +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index bcbb65d4b..7c04876c1 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -654,6 +654,8 @@ func (a *App) initHandler() { cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketDeny) } + cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive) + var err error a.api, err = handler.New(a.log, a.obj, a.nc, cfg) if err != nil { diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 470abfa04..f97e397bb 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -115,6 +115,7 @@ const ( // Settings. // Kludge. cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload = "kludge.use_default_xmlns_for_complete_multipart" + cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive" // Command line args. cmdHelp = "help" @@ -258,6 +259,7 @@ func newSettings() *viper.Viper { // kludge v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false) + v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second) // Bind flags if err := bindFlags(v, flags); err != nil { diff --git a/config/config.env b/config/config.env index b62452c2f..23b5596cf 100644 --- a/config/config.env +++ b/config/config.env @@ -130,3 +130,5 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse`CompleteMultipartUpload` xml body. S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false +# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. +S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s diff --git a/config/config.yaml b/config/config.yaml index ccc631e2d..afd495046 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -153,3 +153,5 @@ resolve_bucket: kludge: # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse`CompleteMultipartUpload` xml body. use_default_xmlns_for_complete_multipart: false + # Set timeout between whitespace transmissions during CompleteMultipartUpload processing. + complete_multipart_keepalive: 10s diff --git a/docs/configuration.md b/docs/configuration.md index 4bc4c463a..115382779 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -504,8 +504,10 @@ Workarounds for non-standard use cases. ```yaml kludge: use_default_xmlns_for_complete_multipart: false + complete_multipart_keepalive: 10s ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|--------------------------------------------|--------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------| -| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|--------------------------------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------| +| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. | +| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |