From 890a8ed2370ccc55fd986e3efe923ea95d2cff7a Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Thu, 26 Oct 2023 11:33:51 +0300 Subject: [PATCH] [#227] Add versionID header after complete multipart Signed-off-by: Marina Biryukova --- CHANGELOG.md | 2 + api/handler/api.go | 1 - api/handler/multipart_upload.go | 86 ++++------------------------ api/handler/multipart_upload_test.go | 40 ------------- api/handler/util.go | 16 ------ api/middleware/response.go | 7 --- cmd/s3-gw/app.go | 28 ++++----- cmd/s3-gw/app_settings.go | 2 - config/config.env | 2 - config/config.yaml | 2 - docs/configuration.md | 2 - 11 files changed, 25 insertions(+), 163 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96dfa14..a5bbcde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ This document outlines major changes between releases. - Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146) - Add new `frostfs.client_cut` config param (#192) - Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197) +- Add `X-Amz-Version-Id` header after complete multipart upload (#227) ### Changed - Update prometheus to v1.15.0 (#94) @@ -53,6 +54,7 @@ This document outlines major changes between releases. ### Removed - Drop `tree.service` param (now endpoints from `peers` section are used) (#133) +- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227) ## [0.27.0] - Karpinsky - 2023-07-12 diff --git a/api/handler/api.go b/api/handler/api.go index 5b254cf..34dbb0f 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -40,7 +40,6 @@ type ( NotificatorEnabled() bool ResolveZoneList() []string IsResolveListAllow() bool - CompleteMultipartKeepalive() time.Duration BypassContentEncodingInChunks() bool MD5Enabled() bool } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 16ef167..56742d9 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -3,7 +3,6 @@ package handler import ( "encoding/xml" "fmt" - "io" "net/http" "net/url" "strconv" @@ -399,6 +398,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. return } + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket settings", reqInfo, err) + return + } + var ( uploadID = r.URL.Query().Get(uploadIDHeaderName) uploadInfo = &layer.UploadInfoParams{ @@ -425,29 +430,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. Parts: reqBody.Parts, } - // Next operations might take some time, so we want to keep client's - // connection alive. To do so, gateway sends periodic white spaces - // back to the client the same way as Amazon S3 service does. - stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive()) - // Start complete multipart upload which may take some time to fetch object // and re-upload it part by part. objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo) - // Stop periodic writer as complete multipart upload is finished - // successfully or not. - headerIsWritten := stopPeriodicResponseWriter() - - responseWriter := middleware.EncodeToResponse - errLogger := h.logAndSendError - // Do not send XML and HTTP headers if periodic writer was invoked at this point. - if headerIsWritten { - responseWriter = middleware.EncodeToResponseNoHeader - errLogger = h.logAndSendErrorNoHeader - } - if err != nil { - errLogger(w, "complete multipart error", reqInfo, err, additional...) + h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...) return } @@ -457,12 +445,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. ETag: objInfo.ETag(h.cfg.MD5Enabled()), } - // Here we previously set api.AmzVersionID header for versioned bucket. - // It is not possible after #60, because of periodic white - // space XML writer to keep connection with the client. + if settings.VersioningEnabled() { + w.Header().Set(api.AmzVersionID, objInfo.VersionID()) + } - if err = responseWriter(w, response); err != nil { - errLogger(w, "something went wrong", reqInfo, err, additional...) + if err = middleware.EncodeToResponse(w, response); err != nil { + h.logAndSendError(w, "something went wrong", reqInfo, err, additional...) } } @@ -730,53 +718,3 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart StorageClass: api.DefaultStorageClass, } } - -// periodicXMLWriter creates go routine to write xml header and whitespaces -// over time to avoid connection drop from the client. To work properly, -// pass `http.ResponseWriter` with implemented `http.Flusher` interface. -// Returns stop function which returns boolean if writer has been used -// during goroutine execution. To disable writer, pass 0 duration value. -func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) { - if dur == 0 { // 0 duration disables periodic writer - return func() bool { return false } - } - - whitespaceChar := []byte(" ") - closer := make(chan struct{}) - done := make(chan struct{}) - headerWritten := false - - go func() { - defer close(done) - - tick := time.NewTicker(dur) - defer tick.Stop() - - for { - select { - case <-tick.C: - if !headerWritten { - _, err := w.Write([]byte(xml.Header)) - headerWritten = err == nil - } - _, err := w.Write(whitespaceChar) - if err != nil { - return // is there anything we can do better than ignore error? - } - if buffered, ok := w.(http.Flusher); ok { - buffered.Flush() - } - case <-closer: - return - } - } - }() - - stop = func() bool { - close(closer) - <-done // wait for goroutine to stop - return headerWritten - } - - return stop -} diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index f078ab2..dbf06b2 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -1,7 +1,6 @@ package handler import ( - "bytes" "crypto/md5" "encoding/hex" "encoding/xml" @@ -10,7 +9,6 @@ import ( "net/url" "strconv" "testing" - "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" @@ -22,44 +20,6 @@ const ( partNumberMarkerQuery = "part-number-marker" ) -func TestPeriodicWriter(t *testing.T) { - const dur = 100 * time.Millisecond - const whitespaces = 8 - expected := []byte(xml.Header) - for i := 0; i < whitespaces; i++ { - expected = append(expected, []byte(" ")...) - } - - t.Run("writes data", func(t *testing.T) { - buf := bytes.NewBuffer(nil) - stop := periodicXMLWriter(buf, dur) - - // N number of whitespaces + half durations to guarantee at least N writes in buffer - time.Sleep(whitespaces*dur + dur/2) - require.True(t, stop()) - require.Equal(t, expected, buf.Bytes()) - - t.Run("no additional data after stop", func(t *testing.T) { - time.Sleep(2 * dur) - require.Equal(t, expected, buf.Bytes()) - }) - }) - - t.Run("does not write data", func(t *testing.T) { - buf := bytes.NewBuffer(nil) - stop := periodicXMLWriter(buf, dur) - time.Sleep(dur / 2) - require.False(t, stop()) - require.Empty(t, buf.Bytes()) - - t.Run("disabled", func(t *testing.T) { - stop = periodicXMLWriter(buf, 0) - require.False(t, stop()) - require.Empty(t, buf.Bytes()) - }) - }) -} - func TestMultipartUploadInvalidPart(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/handler/util.go b/api/handler/util.go index f6e2d90..3b352e8 100644 --- a/api/handler/util.go +++ b/api/handler/util.go @@ -56,22 +56,6 @@ func handleDeleteMarker(w http.ResponseWriter, err error) error { return fmt.Errorf("%w: %s", s3errors.GetAPIError(target.ErrorCode), err) } -func (h *handler) logAndSendErrorNoHeader(w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) { - middleware.WriteErrorResponseNoHeader(w, reqInfo, transformToS3Error(err)) - fields := []zap.Field{ - zap.String("request_id", reqInfo.RequestID), - zap.String("method", reqInfo.API), - zap.String("bucket", reqInfo.BucketName), - zap.String("object", reqInfo.ObjectName), - zap.String("description", logText), - zap.Error(err)} - fields = append(fields, additional...) - if traceID, err := trace.TraceIDFromHex(reqInfo.TraceID); err == nil && traceID.IsValid() { - fields = append(fields, zap.String("trace_id", reqInfo.TraceID)) - } - h.log.Error(logs.RequestFailed, fields...) // consider using h.reqLogger (it requires accept context.Context or http.Request) -} - func transformToS3Error(err error) error { err = frosterrors.UnwrapErr(err) // this wouldn't work with errors.Join if _, ok := err.(s3errors.Error); ok { diff --git a/api/middleware/response.go b/api/middleware/response.go index a9e0905..adec21a 100644 --- a/api/middleware/response.go +++ b/api/middleware/response.go @@ -141,13 +141,6 @@ func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) int return code } -// WriteErrorResponseNoHeader writes XML encoded error to the response body. -func WriteErrorResponseNoHeader(w http.ResponseWriter, reqInfo *ReqInfo, err error) { - errorResponse := getAPIErrorResponse(reqInfo, err) - encodedErrorResponse := EncodeResponse(errorResponse) - WriteResponseBody(w, encodedErrorResponse) -} - // Write http common headers. func setCommonHeaders(w http.ResponseWriter) { w.Header().Set(hdrServerInfo, version.Server) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 71e9868..a8b6f42 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -69,13 +69,12 @@ type ( } appSettings struct { - logLevel zap.AtomicLevel - maxClient maxClientsConfig - defaultMaxAge int - notificatorEnabled bool - resolveZoneList []string - isResolveListAllow bool // True if ResolveZoneList contains allowed zones - completeMultipartKeepalive time.Duration + logLevel zap.AtomicLevel + maxClient maxClientsConfig + defaultMaxAge int + notificatorEnabled bool + resolveZoneList []string + isResolveListAllow bool // True if ResolveZoneList contains allowed zones mu sync.RWMutex defaultPolicy netmap.PlacementPolicy @@ -174,12 +173,11 @@ func (a *App) initLayer(ctx context.Context) { func newAppSettings(log *Logger, v *viper.Viper) *appSettings { settings := &appSettings{ - logLevel: log.lvl, - maxClient: newMaxClients(v), - defaultXMLNS: v.GetBool(cfgKludgeUseDefaultXMLNS), - defaultMaxAge: fetchDefaultMaxAge(v, log.logger), - notificatorEnabled: v.GetBool(cfgEnableNATS), - completeMultipartKeepalive: v.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive), + logLevel: log.lvl, + maxClient: newMaxClients(v), + defaultXMLNS: v.GetBool(cfgKludgeUseDefaultXMLNS), + defaultMaxAge: fetchDefaultMaxAge(v, log.logger), + notificatorEnabled: v.GetBool(cfgEnableNATS), } settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) @@ -310,10 +308,6 @@ func (s *appSettings) IsResolveListAllow() bool { return s.isResolveListAllow } -func (s *appSettings) CompleteMultipartKeepalive() time.Duration { - return s.completeMultipartKeepalive -} - func (s *appSettings) MD5Enabled() bool { s.mu.RLock() defer s.mu.RUnlock() diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 02114c7..0d28c7a 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -134,7 +134,6 @@ const ( // Settings. // Kludge. cfgKludgeUseDefaultXMLNS = "kludge.use_default_xmlns" - cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive" cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks" // Web. @@ -547,7 +546,6 @@ func newSettings() *viper.Viper { // kludge v.SetDefault(cfgKludgeUseDefaultXMLNS, false) - v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second) v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false) // web diff --git a/config/config.env b/config/config.env index 1d4d500..3c0e53a 100644 --- a/config/config.env +++ b/config/config.env @@ -140,8 +140,6 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. S3_GW_KLUDGE_USE_DEFAULT_XMLNS=false -# Set timeout between whitespace transmissions during CompleteMultipartUpload processing. -S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s # Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header. S3_GW_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false diff --git a/config/config.yaml b/config/config.yaml index 20f5e3b..bac2103 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -169,8 +169,6 @@ resolve_bucket: kludge: # Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. use_default_xmlns: false - # Set timeout between whitespace transmissions during CompleteMultipartUpload processing. - complete_multipart_keepalive: 10s # Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header. bypass_content_encoding_check_in_chunks: false diff --git a/docs/configuration.md b/docs/configuration.md index 9f43b8b..2a11153 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -543,14 +543,12 @@ Workarounds for non-standard use cases. ```yaml kludge: use_default_xmlns: false - complete_multipart_keepalive: 10s bypass_content_encoding_check_in_chunks: false ``` | Parameter | Type | SIGHUP reload | Default value | Description | |-------------------------------------------|------------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------| | `use_default_xmlns` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. | -| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. | | `bypass_content_encoding_check_in_chunks` | `bool` | yes | false | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. | # `runtime` section