[#227] Add versionID header after complete multipart #251
11 changed files with 25 additions and 163 deletions
|
@ -34,6 +34,7 @@ This document outlines major changes between releases.
|
||||||
- Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146)
|
- Add new `kludge.bypass_content_encoding_check_in_chunks` config param (#146)
|
||||||
- Add new `frostfs.client_cut` config param (#192)
|
- Add new `frostfs.client_cut` config param (#192)
|
||||||
- Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197)
|
- Add new `frostfs.buffer_max_size_for_put` config param and sync TZ hash for PUT operations (#197)
|
||||||
|
- Add `X-Amz-Version-Id` header after complete multipart upload (#227)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Update prometheus to v1.15.0 (#94)
|
- Update prometheus to v1.15.0 (#94)
|
||||||
|
@ -53,6 +54,7 @@ This document outlines major changes between releases.
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- Drop `tree.service` param (now endpoints from `peers` section are used) (#133)
|
- Drop `tree.service` param (now endpoints from `peers` section are used) (#133)
|
||||||
|
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)
|
||||||
|
|
||||||
## [0.27.0] - Karpinsky - 2023-07-12
|
## [0.27.0] - Karpinsky - 2023-07-12
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,6 @@ type (
|
||||||
NotificatorEnabled() bool
|
NotificatorEnabled() bool
|
||||||
ResolveZoneList() []string
|
ResolveZoneList() []string
|
||||||
IsResolveListAllow() bool
|
IsResolveListAllow() bool
|
||||||
CompleteMultipartKeepalive() time.Duration
|
|
||||||
BypassContentEncodingInChunks() bool
|
BypassContentEncodingInChunks() bool
|
||||||
MD5Enabled() bool
|
MD5Enabled() bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package handler
|
||||||
import (
|
import (
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -399,6 +398,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
uploadID = r.URL.Query().Get(uploadIDHeaderName)
|
uploadID = r.URL.Query().Get(uploadIDHeaderName)
|
||||||
uploadInfo = &layer.UploadInfoParams{
|
uploadInfo = &layer.UploadInfoParams{
|
||||||
|
@ -425,29 +430,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
Parts: reqBody.Parts,
|
Parts: reqBody.Parts,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Next operations might take some time, so we want to keep client's
|
|
||||||
// connection alive. To do so, gateway sends periodic white spaces
|
|
||||||
// back to the client the same way as Amazon S3 service does.
|
|
||||||
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive())
|
|
||||||
|
|
||||||
// Start complete multipart upload which may take some time to fetch object
|
// Start complete multipart upload which may take some time to fetch object
|
||||||
// and re-upload it part by part.
|
// and re-upload it part by part.
|
||||||
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
|
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
|
||||||
|
|
||||||
// Stop periodic writer as complete multipart upload is finished
|
|
||||||
// successfully or not.
|
|
||||||
headerIsWritten := stopPeriodicResponseWriter()
|
|
||||||
|
|
||||||
responseWriter := middleware.EncodeToResponse
|
|
||||||
errLogger := h.logAndSendError
|
|
||||||
// Do not send XML and HTTP headers if periodic writer was invoked at this point.
|
|
||||||
if headerIsWritten {
|
|
||||||
responseWriter = middleware.EncodeToResponseNoHeader
|
|
||||||
errLogger = h.logAndSendErrorNoHeader
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errLogger(w, "complete multipart error", reqInfo, err, additional...)
|
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,12 +445,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Here we previously set api.AmzVersionID header for versioned bucket.
|
if settings.VersioningEnabled() {
|
||||||
// It is not possible after #60, because of periodic white
|
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
|
||||||
// space XML writer to keep connection with the client.
|
}
|
||||||
|
|
||||||
if err = responseWriter(w, response); err != nil {
|
if err = middleware.EncodeToResponse(w, response); err != nil {
|
||||||
errLogger(w, "something went wrong", reqInfo, err, additional...)
|
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -730,53 +718,3 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart
|
||||||
StorageClass: api.DefaultStorageClass,
|
StorageClass: api.DefaultStorageClass,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// periodicXMLWriter creates go routine to write xml header and whitespaces
|
|
||||||
// over time to avoid connection drop from the client. To work properly,
|
|
||||||
// pass `http.ResponseWriter` with implemented `http.Flusher` interface.
|
|
||||||
// Returns stop function which returns boolean if writer has been used
|
|
||||||
// during goroutine execution. To disable writer, pass 0 duration value.
|
|
||||||
func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) {
|
|
||||||
if dur == 0 { // 0 duration disables periodic writer
|
|
||||||
return func() bool { return false }
|
|
||||||
}
|
|
||||||
|
|
||||||
whitespaceChar := []byte(" ")
|
|
||||||
closer := make(chan struct{})
|
|
||||||
done := make(chan struct{})
|
|
||||||
headerWritten := false
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
|
|
||||||
tick := time.NewTicker(dur)
|
|
||||||
defer tick.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-tick.C:
|
|
||||||
if !headerWritten {
|
|
||||||
_, err := w.Write([]byte(xml.Header))
|
|
||||||
headerWritten = err == nil
|
|
||||||
}
|
|
||||||
_, err := w.Write(whitespaceChar)
|
|
||||||
if err != nil {
|
|
||||||
return // is there anything we can do better than ignore error?
|
|
||||||
}
|
|
||||||
if buffered, ok := w.(http.Flusher); ok {
|
|
||||||
buffered.Flush()
|
|
||||||
}
|
|
||||||
case <-closer:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
stop = func() bool {
|
|
||||||
close(closer)
|
|
||||||
<-done // wait for goroutine to stop
|
|
||||||
return headerWritten
|
|
||||||
}
|
|
||||||
|
|
||||||
return stop
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
@ -10,7 +9,6 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
@ -22,44 +20,6 @@ const (
|
||||||
partNumberMarkerQuery = "part-number-marker"
|
partNumberMarkerQuery = "part-number-marker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPeriodicWriter(t *testing.T) {
|
|
||||||
const dur = 100 * time.Millisecond
|
|
||||||
const whitespaces = 8
|
|
||||||
expected := []byte(xml.Header)
|
|
||||||
for i := 0; i < whitespaces; i++ {
|
|
||||||
expected = append(expected, []byte(" ")...)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("writes data", func(t *testing.T) {
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
stop := periodicXMLWriter(buf, dur)
|
|
||||||
|
|
||||||
// N number of whitespaces + half durations to guarantee at least N writes in buffer
|
|
||||||
time.Sleep(whitespaces*dur + dur/2)
|
|
||||||
require.True(t, stop())
|
|
||||||
require.Equal(t, expected, buf.Bytes())
|
|
||||||
|
|
||||||
t.Run("no additional data after stop", func(t *testing.T) {
|
|
||||||
time.Sleep(2 * dur)
|
|
||||||
require.Equal(t, expected, buf.Bytes())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("does not write data", func(t *testing.T) {
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
stop := periodicXMLWriter(buf, dur)
|
|
||||||
time.Sleep(dur / 2)
|
|
||||||
require.False(t, stop())
|
|
||||||
require.Empty(t, buf.Bytes())
|
|
||||||
|
|
||||||
t.Run("disabled", func(t *testing.T) {
|
|
||||||
stop = periodicXMLWriter(buf, 0)
|
|
||||||
require.False(t, stop())
|
|
||||||
require.Empty(t, buf.Bytes())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMultipartUploadInvalidPart(t *testing.T) {
|
func TestMultipartUploadInvalidPart(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -56,22 +56,6 @@ func handleDeleteMarker(w http.ResponseWriter, err error) error {
|
||||||
return fmt.Errorf("%w: %s", s3errors.GetAPIError(target.ErrorCode), err)
|
return fmt.Errorf("%w: %s", s3errors.GetAPIError(target.ErrorCode), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) logAndSendErrorNoHeader(w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) {
|
|
||||||
middleware.WriteErrorResponseNoHeader(w, reqInfo, transformToS3Error(err))
|
|
||||||
fields := []zap.Field{
|
|
||||||
zap.String("request_id", reqInfo.RequestID),
|
|
||||||
zap.String("method", reqInfo.API),
|
|
||||||
zap.String("bucket", reqInfo.BucketName),
|
|
||||||
zap.String("object", reqInfo.ObjectName),
|
|
||||||
zap.String("description", logText),
|
|
||||||
zap.Error(err)}
|
|
||||||
fields = append(fields, additional...)
|
|
||||||
if traceID, err := trace.TraceIDFromHex(reqInfo.TraceID); err == nil && traceID.IsValid() {
|
|
||||||
fields = append(fields, zap.String("trace_id", reqInfo.TraceID))
|
|
||||||
}
|
|
||||||
h.log.Error(logs.RequestFailed, fields...) // consider using h.reqLogger (it requires accept context.Context or http.Request)
|
|
||||||
}
|
|
||||||
|
|
||||||
func transformToS3Error(err error) error {
|
func transformToS3Error(err error) error {
|
||||||
err = frosterrors.UnwrapErr(err) // this wouldn't work with errors.Join
|
err = frosterrors.UnwrapErr(err) // this wouldn't work with errors.Join
|
||||||
if _, ok := err.(s3errors.Error); ok {
|
if _, ok := err.(s3errors.Error); ok {
|
||||||
|
|
|
@ -141,13 +141,6 @@ func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) int
|
||||||
return code
|
return code
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteErrorResponseNoHeader writes XML encoded error to the response body.
|
|
||||||
func WriteErrorResponseNoHeader(w http.ResponseWriter, reqInfo *ReqInfo, err error) {
|
|
||||||
errorResponse := getAPIErrorResponse(reqInfo, err)
|
|
||||||
encodedErrorResponse := EncodeResponse(errorResponse)
|
|
||||||
WriteResponseBody(w, encodedErrorResponse)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write http common headers.
|
// Write http common headers.
|
||||||
func setCommonHeaders(w http.ResponseWriter) {
|
func setCommonHeaders(w http.ResponseWriter) {
|
||||||
w.Header().Set(hdrServerInfo, version.Server)
|
w.Header().Set(hdrServerInfo, version.Server)
|
||||||
|
|
|
@ -69,13 +69,12 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
appSettings struct {
|
appSettings struct {
|
||||||
logLevel zap.AtomicLevel
|
logLevel zap.AtomicLevel
|
||||||
maxClient maxClientsConfig
|
maxClient maxClientsConfig
|
||||||
defaultMaxAge int
|
defaultMaxAge int
|
||||||
notificatorEnabled bool
|
notificatorEnabled bool
|
||||||
resolveZoneList []string
|
resolveZoneList []string
|
||||||
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
|
||||||
completeMultipartKeepalive time.Duration
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
defaultPolicy netmap.PlacementPolicy
|
defaultPolicy netmap.PlacementPolicy
|
||||||
|
@ -174,12 +173,11 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
|
|
||||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings := &appSettings{
|
settings := &appSettings{
|
||||||
logLevel: log.lvl,
|
logLevel: log.lvl,
|
||||||
maxClient: newMaxClients(v),
|
maxClient: newMaxClients(v),
|
||||||
defaultXMLNS: v.GetBool(cfgKludgeUseDefaultXMLNS),
|
defaultXMLNS: v.GetBool(cfgKludgeUseDefaultXMLNS),
|
||||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||||
notificatorEnabled: v.GetBool(cfgEnableNATS),
|
notificatorEnabled: v.GetBool(cfgEnableNATS),
|
||||||
completeMultipartKeepalive: v.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -310,10 +308,6 @@ func (s *appSettings) IsResolveListAllow() bool {
|
||||||
return s.isResolveListAllow
|
return s.isResolveListAllow
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) CompleteMultipartKeepalive() time.Duration {
|
|
||||||
return s.completeMultipartKeepalive
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *appSettings) MD5Enabled() bool {
|
func (s *appSettings) MD5Enabled() bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
|
@ -134,7 +134,6 @@ const ( // Settings.
|
||||||
|
|
||||||
// Kludge.
|
// Kludge.
|
||||||
cfgKludgeUseDefaultXMLNS = "kludge.use_default_xmlns"
|
cfgKludgeUseDefaultXMLNS = "kludge.use_default_xmlns"
|
||||||
cfgKludgeCompleteMultipartUploadKeepalive = "kludge.complete_multipart_keepalive"
|
|
||||||
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
|
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
|
||||||
|
|
||||||
// Web.
|
// Web.
|
||||||
|
@ -547,7 +546,6 @@ func newSettings() *viper.Viper {
|
||||||
|
|
||||||
// kludge
|
// kludge
|
||||||
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
||||||
v.SetDefault(cfgKludgeCompleteMultipartUploadKeepalive, 10*time.Second)
|
|
||||||
v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false)
|
v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false)
|
||||||
|
|
||||||
// web
|
// web
|
||||||
|
|
|
@ -140,8 +140,6 @@ S3_GW_RESOLVE_BUCKET_ALLOW=container
|
||||||
|
|
||||||
# Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies.
|
# Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies.
|
||||||
S3_GW_KLUDGE_USE_DEFAULT_XMLNS=false
|
S3_GW_KLUDGE_USE_DEFAULT_XMLNS=false
|
||||||
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
|
|
||||||
S3_GW_KLUDGE_COMPLETE_MULTIPART_KEEPALIVE=10s
|
|
||||||
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
|
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
|
||||||
S3_GW_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
|
S3_GW_BYPASS_CONTENT_ENCODING_CHECK_IN_CHUNKS=false
|
||||||
|
|
||||||
|
|
|
@ -169,8 +169,6 @@ resolve_bucket:
|
||||||
kludge:
|
kludge:
|
||||||
# Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies.
|
# Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies.
|
||||||
use_default_xmlns: false
|
use_default_xmlns: false
|
||||||
# Set timeout between whitespace transmissions during CompleteMultipartUpload processing.
|
|
||||||
complete_multipart_keepalive: 10s
|
|
||||||
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
|
# Use this flag to be able to use chunked upload approach without having `aws-chunked` value in `Content-Encoding` header.
|
||||||
bypass_content_encoding_check_in_chunks: false
|
bypass_content_encoding_check_in_chunks: false
|
||||||
|
|
||||||
|
|
|
@ -543,14 +543,12 @@ Workarounds for non-standard use cases.
|
||||||
```yaml
|
```yaml
|
||||||
kludge:
|
kludge:
|
||||||
use_default_xmlns: false
|
use_default_xmlns: false
|
||||||
complete_multipart_keepalive: 10s
|
|
||||||
bypass_content_encoding_check_in_chunks: false
|
bypass_content_encoding_check_in_chunks: false
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|-------------------------------------------|------------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------|
|
|-------------------------------------------|------------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `use_default_xmlns` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. |
|
| `use_default_xmlns` | `bool` | yes | false | Enable using default xml namespace `http://s3.amazonaws.com/doc/2006-03-01/` when parse xml bodies. |
|
||||||
| `complete_multipart_keepalive` | `duration` | no | 10s | Set timeout between whitespace transmissions during CompleteMultipartUpload processing. |
|
|
||||||
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | false | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |
|
| `bypass_content_encoding_check_in_chunks` | `bool` | yes | false | Use this flag to be able to use [chunked upload approach](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html) without having `aws-chunked` value in `Content-Encoding` header. |
|
||||||
|
|
||||||
# `runtime` section
|
# `runtime` section
|
||||||
|
|
Loading…
Reference in a new issue