diff --git a/api/handler/api.go b/api/handler/api.go index 45b4b39..f16e1ca 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -27,13 +27,14 @@ type ( // Config contains data which handler needs to keep. Config struct { - Policy PlacementPolicy - XMLDecoder XMLDecoderProvider - DefaultMaxAge int - NotificatorEnabled bool - CopiesNumber uint32 - ResolveZoneList []string - IsResolveListAllow bool // True if ResolveZoneList contains allowed zones + Policy PlacementPolicy + XMLDecoder XMLDecoderProvider + DefaultMaxAge int + NotificatorEnabled bool + CopiesNumber uint32 + ResolveZoneList []string + IsResolveListAllow bool // True if ResolveZoneList contains allowed zones + CompleteMultipartKeepalive time.Duration } PlacementPolicy interface { diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 258aabf..e33de97 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -400,9 +400,15 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. Parts: reqBody.Parts, } + // Next operations might take some time, so we want to keep client's + // connection alive. To do so, gateway sends periodic white spaces + // back to the client the same way as Amazon S3 service does. + stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive) + uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) if err != nil { - h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) return } objInfo := extendedObjInfo.ObjectInfo @@ -418,7 +424,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. NodeVersion: extendedObjInfo.NodeVersion, } if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil { - h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...) return } } @@ -426,12 +433,14 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. if len(uploadData.ACLHeaders) != 0 { key, err := h.bearerTokenIssuerKey(r.Context()) if err != nil { - h.logAndSendError(w, "couldn't get gate key", reqInfo, err) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "couldn't get gate key", reqInfo, err) return } acl, err := parseACLHeaders(r.Header, key) if err != nil { - h.logAndSendError(w, "could not parse acl", reqInfo, err) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not parse acl", reqInfo, err) return } @@ -441,11 +450,13 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } astObject, err := aclToAst(acl, resInfo) if err != nil { - h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) return } if _, err = h.updateBucketACL(r, astObject, bktInfo, sessionTokenSetEACL); err != nil { - h.logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(w, "could not update bucket acl while completing multipart upload", reqInfo, err, additional...) return } } @@ -460,23 +471,25 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. h.log.Error("couldn't send notification: %w", zap.Error(err)) } - bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) - if err != nil { - h.logAndSendError(w, "could not get bucket settings", reqInfo, err) - } - response := CompleteMultipartUploadResponse{ Bucket: objInfo.Bucket, ETag: objInfo.HashSum, Key: objInfo.Name, } - if bktSettings.VersioningEnabled() { - w.Header().Set(api.AmzVersionID, objInfo.VersionID()) - } + // Here we previously set api.AmzVersionID header for versioned bucket. + // It is not possible after #60, because we introduced periodic white + // space XML writer to keep connection with the client. - if err = api.EncodeToResponse(w, response); err != nil { - h.logAndSendError(w, "something went wrong", reqInfo, err) + headerIsWritten := stopPeriodicResponseWriter() + if headerIsWritten { + if err = api.EncodeToResponseNoHeader(w, response); err != nil { + h.logAndSendErrorNoHeader(w, "something went wrong", reqInfo, err) + } + } else { + if err = api.EncodeToResponse(w, response); err != nil { + h.logAndSendError(w, "something went wrong", reqInfo, err) + } } } @@ -732,3 +745,12 @@ func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) { return stop } + +// periodicWriterErrorSender returns handler function to send error. If header is +// alreay written by periodic XML writer, do not send HTTP and XML headers. +func (h *handler) periodicWriterErrorSender(headerWritten bool) func(http.ResponseWriter, string, *api.ReqInfo, error, ...zap.Field) { + if headerWritten { + return h.logAndSendErrorNoHeader + } + return h.logAndSendError +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index bcbb65d..7c04876 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -654,6 +654,8 @@ func (a *App) initHandler() { cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketDeny) } + cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive) + var err error a.api, err = handler.New(a.log, a.obj, a.nc, cfg) if err != nil { diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 470abfa..f97e397 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -115,6 +115,7 @@ const ( // Settings. // Kludge. cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload = "kludge.use_default_xmlns_for_complete_multipart" + cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive" // Command line args. cmdHelp = "help" @@ -258,6 +259,7 @@ func newSettings() *viper.Viper { // kludge v.SetDefault(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload, false) + v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second) // Bind flags if err := bindFlags(v, flags); err != nil { diff --git a/config/config.env b/config/config.env index b62452c..23b5596 100644 --- a/config/config.env +++ b/config/config.env @@ -130,3 +130,5 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse`CompleteMultipartUpload` xml body. S3_GW_KLUDGE_USE_DEFAULT_XMLNS_FOR_COMPLETE_MULTIPART=false +# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. +S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s diff --git a/config/config.yaml b/config/config.yaml index ccc631e..afd4950 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -153,3 +153,5 @@ resolve_bucket: kludge: # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse`CompleteMultipartUpload` xml body. use_default_xmlns_for_complete_multipart: false + # Set timeout between whitespace transmissions during CompleteMultipartUpload processing. + complete_multipart_keepalive: 10s diff --git a/docs/configuration.md b/docs/configuration.md index 4bc4c46..1153827 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -504,8 +504,10 @@ Workarounds for non-standard use cases. ```yaml kludge: use_default_xmlns_for_complete_multipart: false + complete_multipart_keepalive: 10s ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|--------------------------------------------|--------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------| -| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|--------------------------------------------|------------|---------------|---------------|-----------------------------------------------------------------------------------------------------------------------------| +| `use_default_xmlns_for_complete_multipart` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse `CompleteMultipartUpload` xml body. | +| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |