[#644] Support keepalive during listing
Some checks failed
/ DCO (pull_request) Successful in 55s
/ Vulncheck (pull_request) Successful in 1m4s
/ Builds (pull_request) Successful in 1m34s
/ OCI image (pull_request) Successful in 2m2s
/ Lint (pull_request) Failing after 1m48s
/ Tests (pull_request) Failing after 11m19s
Some checks failed
/ DCO (pull_request) Successful in 55s
/ Vulncheck (pull_request) Successful in 1m4s
/ Builds (pull_request) Successful in 1m34s
/ OCI image (pull_request) Successful in 2m2s
/ Lint (pull_request) Failing after 1m48s
/ Tests (pull_request) Failing after 11m19s
Send whitespaces every time as new object in list is ready to prevent client from context cancelling. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
bfec3e0a5e
commit
d7881c9f5e
12 changed files with 248 additions and 17 deletions
|
@ -42,6 +42,7 @@ type (
|
||||||
RetryMaxBackoff() time.Duration
|
RetryMaxBackoff() time.Duration
|
||||||
RetryStrategy() RetryStrategy
|
RetryStrategy() RetryStrategy
|
||||||
TLSTerminationHeader() string
|
TLSTerminationHeader() string
|
||||||
|
ListingKeepaliveThrottle() time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
FrostFSID interface {
|
FrostFSID interface {
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -13,6 +16,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
|
const maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
|
||||||
|
@ -32,14 +36,28 @@ func (h *handler) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
defer close(ch)
|
||||||
|
params.Chan = ch
|
||||||
|
|
||||||
|
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
|
||||||
|
|
||||||
list, err := h.obj.ListObjectsV1(ctx, params)
|
list, err := h.obj.ListObjectsV1(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter())
|
||||||
|
logAndSendError(ctx, w, "could not list objects v1", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = middleware.EncodeToResponse(w, h.encodeV1(params, list)); err != nil {
|
headerIsWritten := stopPeriodicResponseWriter()
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
if headerIsWritten {
|
||||||
|
if err = middleware.EncodeToResponseNoHeader(w, h.encodeV1(params, list)); err != nil {
|
||||||
|
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v1 response", reqInfo, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = middleware.EncodeToResponse(w, h.encodeV1(params, list)); err != nil {
|
||||||
|
h.logAndSendError(ctx, w, "could not encode listing v1 response", reqInfo, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,14 +95,27 @@ func (h *handler) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ch := make(chan struct{})
|
||||||
|
defer close(ch)
|
||||||
|
params.Chan = ch
|
||||||
|
|
||||||
|
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
|
||||||
|
|
||||||
list, err := h.obj.ListObjectsV2(ctx, params)
|
list, err := h.obj.ListObjectsV2(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
h.logAndSendError(ctx, w, "could not list objects v2", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = middleware.EncodeToResponse(w, h.encodeV2(params, list)); err != nil {
|
headerIsWritten := stopPeriodicResponseWriter()
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
if headerIsWritten {
|
||||||
|
if err = middleware.EncodeToResponseNoHeader(w, h.encodeV2(params, list)); err != nil {
|
||||||
|
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v2 response", reqInfo, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = middleware.EncodeToResponse(w, h.encodeV2(params, list)); err != nil {
|
||||||
|
h.logAndSendError(ctx, w, "could not encode listing v2 response", reqInfo, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,15 +267,28 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := h.obj.ListObjectVersions(ctx, p)
|
ch := make(chan struct{})
|
||||||
|
defer close(ch)
|
||||||
|
p.Chan = ch
|
||||||
|
|
||||||
|
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
|
||||||
|
|
||||||
|
list, err := h.obj.ListObjectVersions(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
h.logAndSendError(ctx, w, "could not list objects versions", reqInfo, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := encodeListObjectVersionsToResponse(p, info, p.BktInfo.Name, h.cfg.MD5Enabled())
|
response := encodeListObjectVersionsToResponse(p, list, p.BktInfo.Name, h.cfg.MD5Enabled())
|
||||||
if err = middleware.EncodeToResponse(w, response); err != nil {
|
headerIsWritten := stopPeriodicResponseWriter()
|
||||||
h.logAndSendError(ctx, w, "something went wrong", reqInfo, err)
|
if headerIsWritten {
|
||||||
|
if err = middleware.EncodeToResponseNoHeader(w, response); err != nil {
|
||||||
|
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing versions response", reqInfo, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = middleware.EncodeToResponse(w, response); err != nil {
|
||||||
|
h.logAndSendError(ctx, w, "could not encode listing versions response", reqInfo, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,3 +371,67 @@ func encodeListObjectVersionsToResponse(p *layer.ListObjectVersionsParams, info
|
||||||
|
|
||||||
return &res
|
return &res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// periodicWriterErrorSender returns handler function to send error. If header is
|
||||||
|
// already written by periodic XML writer, do not send HTTP and XML headers.
|
||||||
|
func (h *handler) periodicWriterErrorSender(headerWritten bool) func(context.Context, http.ResponseWriter, string, *middleware.ReqInfo, error, ...zap.Field) {
|
||||||
|
if headerWritten {
|
||||||
|
return h.logAndSendErrorNoHeader
|
||||||
|
}
|
||||||
|
return h.logAndSendError
|
||||||
|
}
|
||||||
|
|
||||||
|
// periodicXMLWriter creates go routine to write xml header and whitespaces
|
||||||
|
// over time to avoid connection drop from the client. To work properly,
|
||||||
|
// pass `http.ResponseWriter` with implemented `http.Flusher` interface.
|
||||||
|
// Returns stop function which returns boolean if writer has been used
|
||||||
|
// during goroutine execution. To disable writer, pass 0 duration value.
|
||||||
|
func periodicXMLWriter(w io.Writer, dur time.Duration, ch <-chan struct{}) (stop func() bool) {
|
||||||
|
if dur == 0 { // 0 duration disables periodic writer
|
||||||
|
return func() bool { return false }
|
||||||
|
}
|
||||||
|
|
||||||
|
whitespaceChar := []byte(" ")
|
||||||
|
closer := make(chan struct{})
|
||||||
|
done := make(chan struct{})
|
||||||
|
headerWritten := false
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
lastEvent := time.Now()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
if time.Now().Sub(lastEvent) < dur {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lastEvent = time.Now()
|
||||||
|
|
||||||
|
if !headerWritten {
|
||||||
|
_, err := w.Write([]byte(xml.Header))
|
||||||
|
headerWritten = err == nil
|
||||||
|
}
|
||||||
|
_, err := w.Write(whitespaceChar)
|
||||||
|
if err != nil {
|
||||||
|
return // is there anything we can do better than ignore error?
|
||||||
|
}
|
||||||
|
if buffered, ok := w.(http.Flusher); ok {
|
||||||
|
buffered.Flush()
|
||||||
|
}
|
||||||
|
case <-closer:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
stop = func() bool {
|
||||||
|
close(closer)
|
||||||
|
<-done // wait for goroutine to stop
|
||||||
|
return headerWritten
|
||||||
|
}
|
||||||
|
|
||||||
|
return stop
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
@ -841,6 +843,44 @@ func TestListingsWithInvalidEncodingType(t *testing.T) {
|
||||||
listObjectsV1Err(hc, bktName, "invalid", apierr.GetAPIError(apierr.ErrInvalidEncodingMethod))
|
listObjectsV1Err(hc, bktName, "invalid", apierr.GetAPIError(apierr.ErrInvalidEncodingMethod))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeriodicWriter(t *testing.T) {
|
||||||
|
const dur = 100 * time.Millisecond
|
||||||
|
const whitespaces = 8
|
||||||
|
expected := []byte(xml.Header)
|
||||||
|
for i := 0; i < whitespaces; i++ {
|
||||||
|
expected = append(expected, []byte(" ")...)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("writes data", func(t *testing.T) {
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
stop := periodicXMLWriter(buf, dur)
|
||||||
|
|
||||||
|
// N number of whitespaces + half durations to guarantee at least N writes in buffer
|
||||||
|
time.Sleep(whitespaces*dur + dur/2)
|
||||||
|
require.True(t, stop())
|
||||||
|
require.Equal(t, expected, buf.Bytes())
|
||||||
|
|
||||||
|
t.Run("no additional data after stop", func(t *testing.T) {
|
||||||
|
time.Sleep(2 * dur)
|
||||||
|
require.Equal(t, expected, buf.Bytes())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("does not write data", func(t *testing.T) {
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
stop := periodicXMLWriter(buf, dur)
|
||||||
|
time.Sleep(dur / 2)
|
||||||
|
require.False(t, stop())
|
||||||
|
require.Empty(t, buf.Bytes())
|
||||||
|
|
||||||
|
t.Run("disabled", func(t *testing.T) {
|
||||||
|
stop = periodicXMLWriter(buf, 0)
|
||||||
|
require.False(t, stop())
|
||||||
|
require.Empty(t, buf.Bytes())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) {
|
func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) {
|
||||||
for i, v := range versions.Version {
|
for i, v := range versions.Version {
|
||||||
require.Equal(t, names[i], v.Key)
|
require.Equal(t, names[i], v.Key)
|
||||||
|
|
|
@ -45,6 +45,23 @@ func (h *handler) logAndSendError(ctx context.Context, w http.ResponseWriter, lo
|
||||||
h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...)
|
h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *handler) logAndSendErrorNoHeader(ctx context.Context, w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) {
|
||||||
|
err = handleDeleteMarker(w, err)
|
||||||
|
if wrErr := middleware.WriteErrorResponseNoHeader(w, reqInfo, apierr.TransformToS3Error(err)); wrErr != nil {
|
||||||
|
additional = append(additional, zap.NamedError("write_response_error", wrErr))
|
||||||
|
}
|
||||||
|
fields := []zap.Field{
|
||||||
|
zap.String("method", reqInfo.API),
|
||||||
|
zap.String("bucket", reqInfo.BucketName),
|
||||||
|
zap.String("object", reqInfo.ObjectName),
|
||||||
|
zap.String("description", logText),
|
||||||
|
zap.String("user", reqInfo.User),
|
||||||
|
zap.Error(err),
|
||||||
|
}
|
||||||
|
fields = append(fields, additional...)
|
||||||
|
h.reqLogger(ctx).Error(logs.RequestFailed, append(fields, logs.TagField(logs.TagDatapath))...)
|
||||||
|
}
|
||||||
|
|
||||||
func handleDeleteMarker(w http.ResponseWriter, err error) error {
|
func handleDeleteMarker(w http.ResponseWriter, err error) error {
|
||||||
var target layer.DeleteMarkerError
|
var target layer.DeleteMarkerError
|
||||||
if !errors.As(err, &target) {
|
if !errors.As(err, &target) {
|
||||||
|
|
|
@ -193,6 +193,7 @@ type (
|
||||||
Prefix string
|
Prefix string
|
||||||
VersionIDMarker string
|
VersionIDMarker string
|
||||||
Encode string
|
Encode string
|
||||||
|
Chan chan<- struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
ListBucketsParams struct {
|
ListBucketsParams struct {
|
||||||
|
|
|
@ -26,6 +26,7 @@ type (
|
||||||
Encode string
|
Encode string
|
||||||
MaxKeys int
|
MaxKeys int
|
||||||
Prefix string
|
Prefix string
|
||||||
|
Chan chan<- struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectsParamsV1 contains params for ListObjectsV1.
|
// ListObjectsParamsV1 contains params for ListObjectsV1.
|
||||||
|
@ -80,6 +81,8 @@ type (
|
||||||
MaxKeys int
|
MaxKeys int
|
||||||
Marker string
|
Marker string
|
||||||
Bookmark string
|
Bookmark string
|
||||||
|
// Chan is a channel to prevent client from context canceling during long listing.
|
||||||
|
Chan chan<- struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
commonLatestVersionsListingParams struct {
|
commonLatestVersionsListingParams struct {
|
||||||
|
@ -107,6 +110,7 @@ func (n *Layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
|
||||||
MaxKeys: p.MaxKeys,
|
MaxKeys: p.MaxKeys,
|
||||||
Marker: p.Marker,
|
Marker: p.Marker,
|
||||||
Bookmark: p.Marker,
|
Bookmark: p.Marker,
|
||||||
|
Chan: p.Chan,
|
||||||
},
|
},
|
||||||
ListType: ListObjectsV1Type,
|
ListType: ListObjectsV1Type,
|
||||||
}
|
}
|
||||||
|
@ -138,6 +142,7 @@ func (n *Layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
MaxKeys: p.MaxKeys,
|
MaxKeys: p.MaxKeys,
|
||||||
Marker: p.StartAfter,
|
Marker: p.StartAfter,
|
||||||
Bookmark: p.ContinuationToken,
|
Bookmark: p.ContinuationToken,
|
||||||
|
Chan: p.Chan,
|
||||||
},
|
},
|
||||||
ListType: ListObjectsV2Type,
|
ListType: ListObjectsV2Type,
|
||||||
}
|
}
|
||||||
|
@ -165,6 +170,7 @@ func (n *Layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
||||||
MaxKeys: p.MaxKeys,
|
MaxKeys: p.MaxKeys,
|
||||||
Marker: p.KeyMarker,
|
Marker: p.KeyMarker,
|
||||||
Bookmark: p.VersionIDMarker,
|
Bookmark: p.VersionIDMarker,
|
||||||
|
Chan: p.Chan,
|
||||||
}
|
}
|
||||||
|
|
||||||
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
|
objects, isTruncated, err := n.getAllObjectsVersions(ctx, prm)
|
||||||
|
@ -207,6 +213,7 @@ func (n *Layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
|
||||||
objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1)
|
objects = make([]*data.ExtendedNodeVersion, 0, p.MaxKeys+1)
|
||||||
objects = append(objects, session.Next...)
|
objects = append(objects, session.Next...)
|
||||||
for obj := range objOutCh {
|
for obj := range objOutCh {
|
||||||
|
p.Chan <- struct{}{}
|
||||||
objects = append(objects, obj)
|
objects = append(objects, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +271,7 @@ func handleGeneratedVersions(objOutCh <-chan *data.ExtendedNodeVersion, p common
|
||||||
var listRowStartIndex int
|
var listRowStartIndex int
|
||||||
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
|
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
|
||||||
for eoi := range objOutCh {
|
for eoi := range objOutCh {
|
||||||
|
p.Chan <- struct{}{}
|
||||||
name := eoi.NodeVersion.FilePath
|
name := eoi.NodeVersion.FilePath
|
||||||
if eoi.DirName != "" {
|
if eoi.DirName != "" {
|
||||||
name = eoi.DirName
|
name = eoi.DirName
|
||||||
|
@ -403,6 +411,7 @@ func nodesGeneratorStream(ctx context.Context, p commonVersionsListingParams, st
|
||||||
|
|
||||||
LOOP:
|
LOOP:
|
||||||
for err == nil {
|
for err == nil {
|
||||||
|
//time.Sleep(7 * time.Second)
|
||||||
node, err := stream.Stream.Next(ctx)
|
node, err := stream.Stream.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, io.EOF) {
|
if !errors.Is(err, io.EOF) {
|
||||||
|
|
|
@ -144,6 +144,17 @@ func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) (int
|
||||||
return code, nil
|
return code, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteErrorResponseNoHeader writes XML encoded error to the response body.
|
||||||
|
func WriteErrorResponseNoHeader(w http.ResponseWriter, reqInfo *ReqInfo, err error) error {
|
||||||
|
errorResponse := getAPIErrorResponse(reqInfo, err)
|
||||||
|
encodedErrorResponse, err := EncodeResponseNoHeader(errorResponse)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return WriteResponseBody(w, encodedErrorResponse)
|
||||||
|
}
|
||||||
|
|
||||||
// Write http common headers.
|
// Write http common headers.
|
||||||
func setCommonHeaders(w http.ResponseWriter) {
|
func setCommonHeaders(w http.ResponseWriter) {
|
||||||
w.Header().Set(hdrServerInfo, version.Server)
|
w.Header().Set(hdrServerInfo, version.Server)
|
||||||
|
@ -200,6 +211,18 @@ func EncodeResponse(response interface{}) ([]byte, error) {
|
||||||
return bytesBuffer.Bytes(), nil
|
return bytesBuffer.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EncodeResponseNoHeader encodes response without setting xml.Header.
|
||||||
|
// Should be used with periodicXMLWriter which sends xml.Header to the client
|
||||||
|
// with whitespaces to keep connection alive.
|
||||||
|
func EncodeResponseNoHeader(response interface{}) ([]byte, error) {
|
||||||
|
var bytesBuffer bytes.Buffer
|
||||||
|
if err := xml.NewEncoder(&bytesBuffer).Encode(response); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytesBuffer.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// EncodeToResponse encodes the response into ResponseWriter.
|
// EncodeToResponse encodes the response into ResponseWriter.
|
||||||
func EncodeToResponse(w http.ResponseWriter, response interface{}) error {
|
func EncodeToResponse(w http.ResponseWriter, response interface{}) error {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
|
|
|
@ -138,6 +138,7 @@ type (
|
||||||
tombstoneMembersSize int
|
tombstoneMembersSize int
|
||||||
tombstoneLifetime uint64
|
tombstoneLifetime uint64
|
||||||
tlsTerminationHeader string
|
tlsTerminationHeader string
|
||||||
|
listingKeepaliveThrottle time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -373,6 +374,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
tombstoneMembersSize := fetchTombstoneMembersSize(v)
|
tombstoneMembersSize := fetchTombstoneMembersSize(v)
|
||||||
tombstoneLifetime := fetchTombstoneLifetime(v)
|
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||||
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
||||||
|
listingKeepaliveThrottle := fetchListingKeepaliveThrottle(v)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -406,6 +408,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.tombstoneMembersSize = tombstoneMembersSize
|
s.tombstoneMembersSize = tombstoneMembersSize
|
||||||
s.tombstoneLifetime = tombstoneLifetime
|
s.tombstoneLifetime = tombstoneLifetime
|
||||||
s.tlsTerminationHeader = tlsTerminationHeader
|
s.tlsTerminationHeader = tlsTerminationHeader
|
||||||
|
s.listingKeepaliveThrottle = listingKeepaliveThrottle
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||||
|
@ -643,6 +646,12 @@ func (s *appSettings) TombstoneLifetime() uint64 {
|
||||||
return s.tombstoneLifetime
|
return s.tombstoneLifetime
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) ListingKeepaliveThrottle() time.Duration {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.listingKeepaliveThrottle
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
|
|
@ -73,6 +73,8 @@ const (
|
||||||
defaultTombstoneMembersSize = 100
|
defaultTombstoneMembersSize = 100
|
||||||
defaultTombstoneWorkerPoolSize = 100
|
defaultTombstoneWorkerPoolSize = 100
|
||||||
|
|
||||||
|
defaultListingKeepaliveThrottle = 10 * time.Second
|
||||||
|
|
||||||
useDefaultXmlns = "use_default_xmlns"
|
useDefaultXmlns = "use_default_xmlns"
|
||||||
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
bypassContentEncodingCheckInChunks = "bypass_content_encoding_check_in_chunks"
|
||||||
)
|
)
|
||||||
|
@ -202,6 +204,8 @@ const (
|
||||||
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
|
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
|
||||||
cfgKludgeDefaultNamespaces = "kludge.default_namespaces"
|
cfgKludgeDefaultNamespaces = "kludge.default_namespaces"
|
||||||
cfgKludgeProfile = "kludge.profile"
|
cfgKludgeProfile = "kludge.profile"
|
||||||
|
cfgKludgeListingKeepAliveThrottle = "kludge.listing_keepalive_throttle"
|
||||||
|
|
||||||
// Web.
|
// Web.
|
||||||
cfgWebReadTimeout = "web.read_timeout"
|
cfgWebReadTimeout = "web.read_timeout"
|
||||||
cfgWebReadHeaderTimeout = "web.read_header_timeout"
|
cfgWebReadHeaderTimeout = "web.read_header_timeout"
|
||||||
|
@ -926,6 +930,15 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
||||||
return attributes, nil
|
return attributes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchListingKeepaliveThrottle(v *viper.Viper) time.Duration {
|
||||||
|
keepalive := v.GetDuration(cfgKludgeListingKeepAliveThrottle)
|
||||||
|
if keepalive <= 0 {
|
||||||
|
keepalive = defaultListingKeepaliveThrottle
|
||||||
|
}
|
||||||
|
|
||||||
|
return keepalive
|
||||||
|
}
|
||||||
|
|
||||||
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
|
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
|
||||||
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
|
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
|
||||||
if tombstoneLifetime <= 0 {
|
if tombstoneLifetime <= 0 {
|
||||||
|
|
|
@ -193,6 +193,10 @@ S3_GW_KLUDGE_USE_DEFAULT_XMLNS=false
|
||||||
S3_GW_KLUDGE_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
|
S3_GW_KLUDGE_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
|
||||||
# Namespaces that should be handled as default
|
# Namespaces that should be handled as default
|
||||||
S3_GW_KLUDGE_DEFAULT_NAMESPACES="" "root"
|
S3_GW_KLUDGE_DEFAULT_NAMESPACES="" "root"
|
||||||
|
# During listing the s3 gate sends whitespaces to client to prevent it from cancelling request.
|
||||||
|
# The gate do send every time when new object is handled.
|
||||||
|
# Use this parameter to limit such sends by one per provided duration.
|
||||||
|
S3_GW_KLUDGE_LISTING_KEEPALIVE_THROTTLE=10s
|
||||||
# Kludge profiles
|
# Kludge profiles
|
||||||
S3_GW_KLUDGE_PROFILE_0_USER_AGENT=aws-cli
|
S3_GW_KLUDGE_PROFILE_0_USER_AGENT=aws-cli
|
||||||
S3_GW_KLUDGE_PROFILE_0_USE_DEFAULT_XMLNS=true
|
S3_GW_KLUDGE_PROFILE_0_USE_DEFAULT_XMLNS=true
|
||||||
|
|
|
@ -234,6 +234,10 @@ kludge:
|
||||||
bypass_content_encoding_check_in_chunks: false
|
bypass_content_encoding_check_in_chunks: false
|
||||||
# Namespaces that should be handled as default
|
# Namespaces that should be handled as default
|
||||||
default_namespaces: [ "", "root" ]
|
default_namespaces: [ "", "root" ]
|
||||||
|
# During listing the s3 gate sends whitespaces to client to prevent it from cancelling request.
|
||||||
|
# The gate do send every time when new object is handled.
|
||||||
|
# Use this parameter to limit such sends by one per provided duration.
|
||||||
|
listing_keepalive_throttle: 10s
|
||||||
# new profile section override defaults based on user agent
|
# new profile section override defaults based on user agent
|
||||||
profile:
|
profile:
|
||||||
- user_agent: aws-cli
|
- user_agent: aws-cli
|
||||||
|
|
|
@ -670,6 +670,7 @@ kludge:
|
||||||
use_default_xmlns: false
|
use_default_xmlns: false
|
||||||
bypass_content_encoding_check_in_chunks: false
|
bypass_content_encoding_check_in_chunks: false
|
||||||
default_namespaces: [ "", "root" ]
|
default_namespaces: [ "", "root" ]
|
||||||
|
listing_keepalive_throttle: 10s
|
||||||
profile:
|
profile:
|
||||||
- user_agent: aws-cli
|
- user_agent: aws-cli
|
||||||
use_default_xmlns: false
|
use_default_xmlns: false
|
||||||
|
@ -678,12 +679,13 @@ kludge:
|
||||||
bypass_content_encoding_check_in_chunks: false
|
bypass_content_encoding_check_in_chunks: false
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|-------------------------------------------|----------------------------------|---------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
|-------------------------------------------|----------------------------------|---------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `use_default_xmlns` | `bool` | yes | `false` | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. |
|
| `use_default_xmlns` | `bool` | yes | `false` | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. |
|
||||||
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | `false` | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |
|
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | `false` | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |
|
||||||
| `default_namespaces` | `[]string` | yes | `["","root"]` | Namespaces that should be handled as default. |
|
| `default_namespaces` | `[]string` | yes | `["","root"]` | Namespaces that should be handled as default. |
|
||||||
| `profile` | [[]Profile](#profile-subsection) | yes | | An array of configurable profiles. |
|
| `listing_keepalive_throttle` | `duration` | yes | `10s` | During listing the s3 gate sends whitespaces to client to prevent it from cancelling request. The gate do send every time when new object is handled. Use this parameter to limit such sends by one per provided duration. |
|
||||||
|
| `profile` | [[]Profile](#profile-subsection) | yes | | An array of configurable profiles. |
|
||||||
|
|
||||||
#### `profile` subsection
|
#### `profile` subsection
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue