diff --git a/api/handler/api.go b/api/handler/api.go index 19e2c4e8..fa5c57b8 100644 --- a/api/handler/api.go +++ b/api/handler/api.go @@ -42,6 +42,7 @@ type ( RetryMaxBackoff() time.Duration RetryStrategy() RetryStrategy TLSTerminationHeader() string + ListingKeepaliveThrottle() time.Duration } FrostFSID interface { diff --git a/api/handler/object_list.go b/api/handler/object_list.go index 33183966..0b05d3e2 100644 --- a/api/handler/object_list.go +++ b/api/handler/object_list.go @@ -1,6 +1,9 @@ package handler import ( + "context" + "encoding/xml" + "io" "net/http" "net/url" "strconv" @@ -13,6 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" ) const maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse. @@ -32,14 +36,28 @@ func (h *handler) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) { return } + ch := make(chan struct{}) + defer close(ch) + params.Chan = ch + + stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch) + list, err := h.obj.ListObjectsV1(ctx, params) if err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter()) + logAndSendError(ctx, w, "could not list objects v1", reqInfo, err) return } - if err = middleware.EncodeToResponse(w, h.encodeV1(params, list)); err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + headerIsWritten := stopPeriodicResponseWriter() + if headerIsWritten { + if err = middleware.EncodeToResponseNoHeader(w, h.encodeV1(params, list)); err != nil { + h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v1 response", reqInfo, err) + } + } else { + if err = middleware.EncodeToResponse(w, h.encodeV1(params, list)); err != nil { + h.logAndSendError(ctx, w, "could not encode listing v1 response", reqInfo, err) + } } } @@ -77,14 +95,27 @@ func (h *handler) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { return } + ch := make(chan struct{}) + defer close(ch) + params.Chan = ch + + stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch) + list, err := h.obj.ListObjectsV2(ctx, params) if err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + h.logAndSendError(ctx, w, "could not list objects v2", reqInfo, err) return } - if err = middleware.EncodeToResponse(w, h.encodeV2(params, list)); err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + headerIsWritten := stopPeriodicResponseWriter() + if headerIsWritten { + if err = middleware.EncodeToResponseNoHeader(w, h.encodeV2(params, list)); err != nil { + h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v2 response", reqInfo, err) + } + } else { + if err = middleware.EncodeToResponse(w, h.encodeV2(params, list)); err != nil { + h.logAndSendError(ctx, w, "could not encode listing v2 response", reqInfo, err) + } } } @@ -236,15 +267,28 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http return } - info, err := h.obj.ListObjectVersions(ctx, p) + ch := make(chan struct{}) + defer close(ch) + p.Chan = ch + + stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch) + + list, err := h.obj.ListObjectVersions(ctx, p) if err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + h.logAndSendError(ctx, w, "could not list objects versions", reqInfo, err) return } - response := encodeListObjectVersionsToResponse(p, info, p.BktInfo.Name, h.cfg.MD5Enabled()) - if err = middleware.EncodeToResponse(w, response); err != nil { - h.logAndSendError(ctx, w, "something went wrong", reqInfo, err) + response := encodeListObjectVersionsToResponse(p, list, p.BktInfo.Name, h.cfg.MD5Enabled()) + headerIsWritten := stopPeriodicResponseWriter() + if headerIsWritten { + if err = middleware.EncodeToResponseNoHeader(w, response); err != nil { + h.logAndSendErrorNoHeader(ctx, w, "could not encode listing versions response", reqInfo, err) + } + } else { + if err = middleware.EncodeToResponse(w, response); err != nil { + h.logAndSendError(ctx, w, "could not encode listing versions response", reqInfo, err) + } } } @@ -327,3 +371,67 @@ func encodeListObjectVersionsToResponse(p *layer.ListObjectVersionsParams, info return &res } + +// periodicWriterErrorSender returns handler function to send error. If header is +// already written by periodic XML writer, do not send HTTP and XML headers. +func (h *handler) periodicWriterErrorSender(headerWritten bool) func(context.Context, http.ResponseWriter, string, *middleware.ReqInfo, error, ...zap.Field) { + if headerWritten { + return h.logAndSendErrorNoHeader + } + return h.logAndSendError +} + +// periodicXMLWriter creates go routine to write xml header and whitespaces +// over time to avoid connection drop from the client. To work properly, +// pass `http.ResponseWriter` with implemented `http.Flusher` interface. +// Returns stop function which returns boolean if writer has been used +// during goroutine execution. To disable writer, pass 0 duration value. +func periodicXMLWriter(w io.Writer, dur time.Duration, ch <-chan struct{}) (stop func() bool) { + if dur == 0 { // 0 duration disables periodic writer + return func() bool { return false } + } + + whitespaceChar := []byte(" ") + closer := make(chan struct{}) + done := make(chan struct{}) + headerWritten := false + + go func() { + defer close(done) + + lastEvent := time.Now() + + for { + select { + case <-ch: + if time.Now().Sub(lastEvent) < dur { + continue + } + + lastEvent = time.Now() + + if !headerWritten { + _, err := w.Write([]byte(xml.Header)) + headerWritten = err == nil + } + _, err := w.Write(whitespaceChar) + if err != nil { + return // is there anything we can do better than ignore error? + } + if buffered, ok := w.(http.Flusher); ok { + buffered.Flush() + } + case <-closer: + return + } + } + }() + + stop = func() bool { + close(closer) + <-done // wait for goroutine to stop + return headerWritten + } + + return stop +} diff --git a/api/handler/object_list_test.go b/api/handler/object_list_test.go index e0b3c30c..86d908dc 100644 --- a/api/handler/object_list_test.go +++ b/api/handler/object_list_test.go @@ -1,7 +1,9 @@ package handler import ( + "bytes" "context" + "encoding/xml" "fmt" "net/http" "net/http/httptest" @@ -841,6 +843,44 @@ func TestListingsWithInvalidEncodingType(t *testing.T) { listObjectsV1Err(hc, bktName, "invalid", apierr.GetAPIError(apierr.ErrInvalidEncodingMethod)) } +func TestPeriodicWriter(t *testing.T) { + const dur = 100 * time.Millisecond + const whitespaces = 8 + expected := []byte(xml.Header) + for i := 0; i < whitespaces; i++ { + expected = append(expected, []byte(" ")...) + } + + t.Run("writes data", func(t *testing.T) { + buf := bytes.NewBuffer(nil) + stop := periodicXMLWriter(buf, dur) + + // N number of whitespaces + half durations to guarantee at least N writes in buffer + time.Sleep(whitespaces*dur + dur/2) + require.True(t, stop()) + require.Equal(t, expected, buf.Bytes()) + + t.Run("no additional data after stop", func(t *testing.T) { + time.Sleep(2 * dur) + require.Equal(t, expected, buf.Bytes()) + }) + }) + + t.Run("does not write data", func(t *testing.T) { + buf := bytes.NewBuffer(nil) + stop := periodicXMLWriter(buf, dur) + time.Sleep(dur / 2) + require.False(t, stop()) + require.Empty(t, buf.Bytes()) + + t.Run("disabled", func(t *testing.T) { + stop = periodicXMLWriter(buf, 0) + require.False(t, stop()) + require.Empty(t, buf.Bytes()) + }) + }) +} + func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) { for i, v := range versions.Version { require.Equal(t, names[i], v.Key) diff --git a/api/handler/util.go b/api/handler/util.go index 7d12af0b..4aa4ae3a 100644 --- a/api/handler/util.go +++ b/api/handler/util.go @@ -45,6 +45,23 @@ func (h *handler) logAndSendError(ctx context.Context, w http.ResponseWriter, lo h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...) } +func (h *handler) logAndSendErrorNoHeader(ctx context.Context, w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) { + err = handleDeleteMarker(w, err) + if wrErr := middleware.WriteErrorResponseNoHeader(w, reqInfo, apierr.TransformToS3Error(err)); wrErr != nil { + additional = append(additional, zap.NamedError("write_response_error", wrErr)) + } + fields := []zap.Field{ + zap.String("method", reqInfo.API), + zap.String("bucket", reqInfo.BucketName), + zap.String("object", reqInfo.ObjectName), + zap.String("description", logText), + zap.String("user", reqInfo.User), + zap.Error(err), + } + fields = append(fields, additional...) + h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...) +} + func handleDeleteMarker(w http.ResponseWriter, err error) error { var target layer.DeleteMarkerError if !errors.As(err, &target) { diff --git a/api/layer/layer.go b/api/layer/layer.go index 16796ebd..1766b04d 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -193,6 +193,7 @@ type ( Prefix string VersionIDMarker string Encode string + Chan chan<- struct{} } ListBucketsParams struct { diff --git a/api/layer/listing.go b/api/layer/listing.go index fb884942..140d3185 100644 --- a/api/layer/listing.go +++ b/api/layer/listing.go @@ -26,6 +26,7 @@ type ( Encode string MaxKeys int Prefix string + Chan chan<- struct{} } // ListObjectsParamsV1 contains params for ListObjectsV1. @@ -80,6 +81,8 @@ type ( MaxKeys int Marker string Bookmark string + // Chan is a channel to prevent client from context canceling during long listing. + Chan chan<- struct{} } commonLatestVersionsListingParams struct { @@ -107,6 +110,7 @@ func (n *Layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis MaxKeys: p.MaxKeys, Marker: p.Marker, Bookmark: p.Marker, + Chan: p.Chan, }, ListType: ListObjectsV1Type, } @@ -138,6 +142,7 @@ func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis MaxKeys: p.MaxKeys, Marker: p.StartAfter, Bookmark: p.ContinuationToken, + Chan: p.Chan, }, ListType: ListObjectsV2Type, } @@ -165,6 +170,7 @@ func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar MaxKeys: p.MaxKeys, Marker: p.KeyMarker, Bookmark: p.VersionIDMarker, + Chan: p.Chan, } objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm) @@ -207,6 +213,7 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1) objects = append(objects, session.Next...) for obj := range objOutCh { + p.Chan <- struct{}{} objects = append(objects, obj) } @@ -264,6 +271,7 @@ func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p common var listRowStartIndex int allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys) for eoi := range objOutCh { + p.Chan <- struct{}{} name := eoi.NodeVersion.FilePath if eoi.DirName != "" { name = eoi.DirName @@ -403,6 +411,7 @@ func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, st LOOP: for err == nil { + //time.Sleep(7 * time.Second) node, err := stream.Stream.Next(ctx) if err != nil { if !errors.Is(err, io.EOF) { diff --git a/api/middleware/response.go b/api/middleware/response.go index fded5b33..02497699 100644 --- a/api/middleware/response.go +++ b/api/middleware/response.go @@ -144,6 +144,17 @@ func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) (int return code, nil } +// WriteErrorResponseNoHeader writes XML encoded error to the response body. +func WriteErrorResponseNoHeader(w http.ResponseWriter, reqInfo *ReqInfo, err error) error { + errorResponse := getAPIErrorResponse(reqInfo, err) + encodedErrorResponse, err := EncodeResponseNoHeader(errorResponse) + if err != nil { + return err + } + + return WriteResponseBody(w, encodedErrorResponse) +} + // Write http common headers. func setCommonHeaders(w http.ResponseWriter) { w.Header().Set(hdrServerInfo, version.Server) @@ -200,6 +211,18 @@ func EncodeResponse(response interface{}) ([]byte, error) { return bytesBuffer.Bytes(), nil } +// EncodeResponseNoHeader encodes response without setting xml.Header. +// Should be used with periodicXMLWriter which sends xml.Header to the client +// with whitespaces to keep connection alive. +func EncodeResponseNoHeader(response interface{}) ([]byte, error) { + var bytesBuffer bytes.Buffer + if err := xml.NewEncoder(&bytesBuffer).Encode(response); err != nil { + return nil, err + } + + return bytesBuffer.Bytes(), nil +} + // EncodeToResponse encodes the response into ResponseWriter. func EncodeToResponse(w http.ResponseWriter, response interface{}) error { w.WriteHeader(http.StatusOK) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 6323115f..313330d6 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -138,6 +138,7 @@ type ( tombstoneMembersSize int tombstoneLifetime uint64 tlsTerminationHeader string + listingKeepaliveThrottle time.Duration } maxClientsConfig struct { @@ -373,6 +374,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { tombstoneMembersSize := fetchTombstoneMembersSize(v) tombstoneLifetime := fetchTombstoneLifetime(v) tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader) + listingKeepaliveThrottle := fetchListingKeepaliveThrottle(v) s.mu.Lock() defer s.mu.Unlock() @@ -406,6 +408,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) { s.tombstoneMembersSize = tombstoneMembersSize s.tombstoneLifetime = tombstoneLifetime s.tlsTerminationHeader = tlsTerminationHeader + s.listingKeepaliveThrottle = listingKeepaliveThrottle } func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool { @@ -643,6 +646,12 @@ func (s *appSettings) TombstoneLifetime() uint64 { return s.tombstoneLifetime } +func (s *appSettings) ListingKeepaliveThrottle() time.Duration { + s.mu.RLock() + defer s.mu.RUnlock() + return s.listingKeepaliveThrottle +} + func (a *App) initAPI(ctx context.Context) { a.initLayer(ctx) a.initHandler() diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index 884a116a..d7853205 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -73,6 +73,8 @@ const ( defaultTombstoneMembersSize = 100 defaultTombstoneWorkerPoolSize = 100 + defaultListingKeepaliveThrottle = 10 * time.Second + useDefaultXmlns = "use_default_xmlns" bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks" ) @@ -202,6 +204,8 @@ const ( cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks" cfgKludgeDefaultNamespaces = "kludge.default_namespaces" cfgKludgeProfile = "kludge.profile" + cfgKludgeListingKeepAliveThrottle = "kludge.listing_keepalive_throttle" + // Web. cfgWebReadTimeout = "web.read_timeout" cfgWebReadHeaderTimeout = "web.read_header_timeout" @@ -926,6 +930,15 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) { return attributes, nil } +func fetchListingKeepaliveThrottle(v *viper.Viper) time.Duration { + keepalive := v.GetDuration(cfgKludgeListingKeepAliveThrottle) + if keepalive <= 0 { + keepalive = defaultListingKeepaliveThrottle + } + + return keepalive +} + func fetchTombstoneLifetime(v *viper.Viper) uint64 { tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime) if tombstoneLifetime <= 0 { diff --git a/config/config.env b/config/config.env index 28b1c7b7..335f5ab7 100644 --- a/config/config.env +++ b/config/config.env @@ -193,6 +193,10 @@ S3_GW_KLUDGE_USE_DEFAULT_XMLNS=false S3_GW_KLUDGE_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false # Namespaces that should be handled as default S3_GW_KLUDGE_DEFAULT_NAMESPACES="" "root" +# During listing the s3 gate sends whitespaces to client to prevent it from cancelling request. +# The gate do send every time when new object is handled. +# Use this parameter to limit such sends by one per provided duration. +S3_GW_KLUDGE_LISTING_KEEPALIVE_THROTTLE=10s # Kludge profiles S3_GW_KLUDGE_PROFILE_0_USER_AGENT=aws-cli S3_GW_KLUDGE_PROFILE_0_USE_DEFAULT_XMLNS=true diff --git a/config/config.yaml b/config/config.yaml index d9630e09..e9569a77 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -234,6 +234,10 @@ kludge: bypass_content_encoding_check_in_chunks: false # Namespaces that should be handled as default default_namespaces: [ "", "root" ] + # During listing the s3 gate sends whitespaces to client to prevent it from cancelling request. + # The gate do send every time when new object is handled. + # Use this parameter to limit such sends by one per provided duration. + listing_keepalive_throttle: 10s # new profile section override defaults based on user agent profile: - user_agent: aws-cli diff --git a/docs/configuration.md b/docs/configuration.md index 42d5e021..9ee9c3e5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -670,6 +670,7 @@ kludge: use_default_xmlns: false bypass_content_encoding_check_in_chunks: false default_namespaces: [ "", "root" ] + listing_keepalive_throttle: 10s profile: - user_agent: aws-cli use_default_xmlns: false @@ -678,12 +679,13 @@ kludge: bypass_content_encoding_check_in_chunks: false ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|-------------------------------------------|----------------------------------|---------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `use_default_xmlns` | `bool` | yes | `false` | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. | -| `bypass_content_encoding_check_in_chunks` | `bool` | yes | `false` | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. | -| `default_namespaces` | `[]string` | yes | `["","root"]` | Namespaces that should be handled as default. | -| `profile` | [[]Profile](#profile-subsection) | yes | | An array of configurable profiles. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|-------------------------------------------|----------------------------------|---------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `use_default_xmlns` | `bool` | yes | `false` | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. | +| `bypass_content_encoding_check_in_chunks` | `bool` | yes | `false` | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. | +| `default_namespaces` | `[]string` | yes | `["","root"]` | Namespaces that should be handled as default. | +| `listing_keepalive_throttle` | `duration` | yes | `10s` | During listing the s3 gate sends whitespaces to client to prevent it from cancelling request. The gate do send every time when new object is handled. Use this parameter to limit such sends by one per provided duration. | +| `profile` | [[]Profile](#profile-subsection) | yes | | An array of configurable profiles. | #### `profile` subsection