[#607] Support unsigned payload streaming with trailers
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
bec63026bd
commit
a4d9658fbb
6 changed files with 216 additions and 52 deletions
|
@ -311,10 +311,23 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
type BodyReader interface {
|
||||||
|
io.ReadCloser
|
||||||
|
TrailerHeaders() map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
type noTrailerBodyReader struct {
|
||||||
|
io.ReadCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *noTrailerBodyReader) TrailerHeaders() map[string]string {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) getBodyReader(r *http.Request) (BodyReader, error) {
|
||||||
shaType, streaming := api.IsSignedStreamingV4(r)
|
shaType, streaming := api.IsSignedStreamingV4(r)
|
||||||
if !streaming {
|
if !streaming {
|
||||||
return r.Body, nil
|
return &noTrailerBodyReader{r.Body}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
encodings := r.Header.Values(api.ContentEncoding)
|
encodings := r.Header.Values(api.ContentEncoding)
|
||||||
|
@ -350,7 +363,7 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
chunkReader io.ReadCloser
|
chunkReader BodyReader
|
||||||
)
|
)
|
||||||
switch shaType {
|
switch shaType {
|
||||||
case api.StreamingContentSHA256, api.StreamingContentSHA256Trailer:
|
case api.StreamingContentSHA256, api.StreamingContentSHA256Trailer:
|
||||||
|
@ -358,7 +371,7 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
||||||
case api.StreamingContentV4aSHA256, api.StreamingContentV4aSHA256Trailer:
|
case api.StreamingContentV4aSHA256, api.StreamingContentV4aSHA256Trailer:
|
||||||
chunkReader, err = newSignV4aChunkedReader(r)
|
chunkReader, err = newSignV4aChunkedReader(r)
|
||||||
default:
|
default:
|
||||||
chunkReader, err = newUnsignedChunkedReader(r)
|
chunkReader, err = newUnsignedChunkedReader(r.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -377,13 +377,34 @@ func TestPutObjectCheckContentSHA256(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithStreamUnsignedBodySmall(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "test2", "tmp.txt"
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
w, req, chunk := getChunkedRequestUnsignedTrailingSmall(hc.context, t, bktName, objName)
|
||||||
|
hc.Handler().PutObjectHandler(w, req)
|
||||||
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
|
||||||
|
w, req = prepareTestRequest(hc, bktName, objName, nil)
|
||||||
|
hc.Handler().HeadObjectHandler(w, req)
|
||||||
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
require.Equal(t, "5", w.Header().Get(api.ContentLength))
|
||||||
|
|
||||||
|
data := getObjectRange(t, hc, bktName, objName, 0, 5)
|
||||||
|
for i := range chunk {
|
||||||
|
require.Equal(t, chunk[i], data[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPutObjectWithStreamUnsignedBody(t *testing.T) {
|
func TestPutObjectWithStreamUnsignedBody(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
bktName, objName := "examplebucket", "chunkObject.txt"
|
bktName, objName := "examplebucket", "chunkObject.txt"
|
||||||
createTestBucket(hc, bktName)
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
w, req, chunk := getChunkedRequestTrailing(hc.context, t, bktName, objName)
|
w, req, chunk := getChunkedRequestUnsignedTrailing(hc.context, t, bktName, objName)
|
||||||
hc.Handler().PutObjectHandler(w, req)
|
hc.Handler().PutObjectHandler(w, req)
|
||||||
assertStatus(t, w, http.StatusOK)
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
|
||||||
|
@ -404,7 +425,7 @@ func TestPutObjectWithStreamBodyAWSExampleTrailing(t *testing.T) {
|
||||||
bktName, objName := "examplebucket", "chunkObject.txt"
|
bktName, objName := "examplebucket", "chunkObject.txt"
|
||||||
createTestBucket(hc, bktName)
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
w, req, chunk := getChunkedRequestUnsignedTrailing(hc.context, t, bktName, objName)
|
w, req, chunk := getChunkedRequestTrailing(hc.context, t, bktName, objName)
|
||||||
hc.Handler().PutObjectHandler(w, req)
|
hc.Handler().PutObjectHandler(w, req)
|
||||||
assertStatus(t, w, http.StatusOK)
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
|
||||||
|
@ -677,6 +698,60 @@ func getChunkedRequestUnsignedTrailing(ctx context.Context, t *testing.T, bktNam
|
||||||
return w, req, chunk
|
return w, req, chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getChunkedRequestUnsignedTrailingSmall(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) {
|
||||||
|
AWSAccessKeyID := "9uEm8zMrGWsEDWiPCnVuQLKTiGtCEXpYXt8eBG7agupw0JDySJZMFuej7PTcPzRqBUyPtFowNu1RtvHULU8XHjie6"
|
||||||
|
AWSSecretAccessKey := "9f546428957ed7e189b7be928906ce7d1d9cb3042dd4d2d5194e28ce8c4c3b8e"
|
||||||
|
|
||||||
|
awsCreds := aws.Credentials{AccessKeyID: AWSAccessKeyID, SecretAccessKey: AWSSecretAccessKey}
|
||||||
|
signer := v4.NewSigner()
|
||||||
|
|
||||||
|
chunk := "tmp2\n"
|
||||||
|
|
||||||
|
reqBody := bytes.NewBufferString("5\r\n")
|
||||||
|
_, err := reqBody.WriteString(chunk)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = reqBody.WriteString("\r\n0\r\n")
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = reqBody.WriteString("x-amz-checksum-crc64nvme:q1EYl4rI0TU=\r\n\r\n")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("PUT", "https://localhost:8184/"+bktName+"/"+objName, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set("x-amz-sdk-checksum-algorithm", "CRC64NVME")
|
||||||
|
req.Header.Set("content-encoding", api.AwsChunked)
|
||||||
|
req.Header.Set("x-amz-trailer", "x-amz-checksum-crc64nvme")
|
||||||
|
req.Header.Set("x-amz-content-sha256", api.StreamingUnsignedPayloadTrailer)
|
||||||
|
req.Header.Set("x-amz-decoded-content-length", "5")
|
||||||
|
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
|
||||||
|
|
||||||
|
signTime, err := time.Parse("20060102T150405Z", "20250203T063745Z")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = signer.SignHTTP(ctx, awsCreds, req, api.StreamingContentSHA256Trailer, "s3", "ru", signTime)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
req.Body = io.NopCloser(reqBody)
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
reqInfo := middleware.NewReqInfo(w, req, middleware.ObjectRequest{Bucket: bktName, Object: objName}, "")
|
||||||
|
req = req.WithContext(middleware.SetReqInfo(ctx, reqInfo))
|
||||||
|
req = req.WithContext(middleware.SetBox(req.Context(), &middleware.Box{
|
||||||
|
ClientTime: signTime,
|
||||||
|
AuthHeaders: &middleware.AuthHeader{
|
||||||
|
AccessKeyID: AWSAccessKeyID,
|
||||||
|
SignatureV4: "a075c83779d1c3c02254fbe4c9eff0a21556d15556fc6a25db69147c4838226b",
|
||||||
|
Region: "ru",
|
||||||
|
},
|
||||||
|
AccessBox: &accessbox.Box{
|
||||||
|
Gate: &accessbox.GateData{
|
||||||
|
SecretKey: AWSSecretAccessKey,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
|
||||||
|
return w, req, []byte(chunk)
|
||||||
|
}
|
||||||
|
|
||||||
func getEmptyChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request) {
|
func getEmptyChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request) {
|
||||||
AWSAccessKeyID := "48c1K4PLVb7SvmV3PjDKEuXaMh8yZMXZ8Wx9msrkKcYw06dZeaxeiPe8vyFm2WsoeVaNt7UWEjNsVkagDs8oX4XXh"
|
AWSAccessKeyID := "48c1K4PLVb7SvmV3PjDKEuXaMh8yZMXZ8Wx9msrkKcYw06dZeaxeiPe8vyFm2WsoeVaNt7UWEjNsVkagDs8oX4XXh"
|
||||||
AWSSecretAccessKey := "09260955b4eb0279dc017ba20a1ddac909cbd226c86cbb2d868e55534c8e64b0"
|
AWSSecretAccessKey := "09260955b4eb0279dc017ba20a1ddac909cbd226c86cbb2d868e55534c8e64b0"
|
||||||
|
|
|
@ -37,6 +37,7 @@ type (
|
||||||
var (
|
var (
|
||||||
errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB")
|
errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB")
|
||||||
errMalformedChunkedEncoding = errors.New("malformed chunked encoding")
|
errMalformedChunkedEncoding = errors.New("malformed chunked encoding")
|
||||||
|
errMalformedTrailerHeaders = errors.New("malformed trailer headers")
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *s3ChunkReader) Close() (err error) {
|
func (c *s3ChunkReader) Close() (err error) {
|
||||||
|
@ -190,7 +191,11 @@ func (c *s3ChunkReader) Read(buf []byte) (num int, err error) {
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, error) {
|
func (c *s3ChunkReader) TrailerHeaders() map[string]string {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSignV4ChunkedReader(req *http.Request) (*s3ChunkReader, error) {
|
||||||
ctx := req.Context()
|
ctx := req.Context()
|
||||||
box, err := middleware.GetBoxData(ctx)
|
box, err := middleware.GetBoxData(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2,8 +2,11 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -55,3 +58,63 @@ func TestSigV4AStreaming(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, chunk1, string(data))
|
require.Equal(t, chunk1, string(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStreamingUnsigned(t *testing.T) {
|
||||||
|
chunk1 := "chunk1"
|
||||||
|
chunk2 := "chunk2"
|
||||||
|
|
||||||
|
t.Run("with trailer", func(t *testing.T) {
|
||||||
|
chunks := []string{chunk1, chunk2}
|
||||||
|
trailer := map[string]string{"x-amz-checksum-crc64nvme": "q1EYl4rI0TU="}
|
||||||
|
body, expected := getChunkedBody(t, chunks, trailer)
|
||||||
|
|
||||||
|
r, err := newUnsignedChunkedReader(body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
data, err := io.ReadAll(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expected, string(data))
|
||||||
|
|
||||||
|
require.EqualValues(t, trailer, r.TrailerHeaders())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("without trailer", func(t *testing.T) {
|
||||||
|
chunks := []string{chunk1, chunk2}
|
||||||
|
body, expected := getChunkedBody(t, chunks, nil)
|
||||||
|
|
||||||
|
r, err := newUnsignedChunkedReader(body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
data, err := io.ReadAll(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expected, string(data))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func getChunkedBody(t *testing.T, chunks []string, trailers map[string]string) (*bytes.Buffer, string) {
|
||||||
|
res := bytes.NewBufferString("")
|
||||||
|
|
||||||
|
for i, chunk := range chunks {
|
||||||
|
meta := strconv.FormatInt(int64(len(chunk)), 16) + "\r\n"
|
||||||
|
if i != 0 {
|
||||||
|
meta = "\r\n" + meta
|
||||||
|
}
|
||||||
|
_, err := res.WriteString(meta)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = res.WriteString(chunk)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := res.WriteString("\r\n0\r\n")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for k, v := range trailers {
|
||||||
|
_, err := res.WriteString(fmt.Sprintf("%s:%s\n", k, v))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = res.WriteString("\r\n")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return res, strings.Join(chunks, "")
|
||||||
|
}
|
||||||
|
|
|
@ -2,28 +2,17 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
|
||||||
"encoding/hex"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4sdk2/signer/v4"
|
|
||||||
errs "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
s3UnsignedChunkReader struct {
|
s3UnsignedChunkReader struct {
|
||||||
ctx context.Context
|
reader *bufio.Reader
|
||||||
reader *bufio.Reader
|
|
||||||
streamSigner *v4.StreamSigner
|
|
||||||
|
|
||||||
requestTime time.Time
|
trailers map[string]string
|
||||||
buffer []byte
|
buffer []byte
|
||||||
offset int
|
offset int
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -114,44 +103,59 @@ func (c *s3UnsignedChunkReader) Read(buf []byte) (num int, err error) {
|
||||||
// If the chunk size is zero we return io.EOF. As specified by AWS,
|
// If the chunk size is zero we return io.EOF. As specified by AWS,
|
||||||
// only the last chunk is zero-sized.
|
// only the last chunk is zero-sized.
|
||||||
if size == 0 {
|
if size == 0 {
|
||||||
|
var k, v string
|
||||||
|
for err == nil {
|
||||||
|
k, err = c.reader.ReadString(':')
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
c.err = errMalformedTrailerHeaders
|
||||||
|
return num, c.err
|
||||||
|
}
|
||||||
|
v, err = c.reader.ReadString('\n')
|
||||||
|
if err != nil {
|
||||||
|
c.err = errMalformedTrailerHeaders
|
||||||
|
return num, c.err
|
||||||
|
}
|
||||||
|
c.trailers[k[:len(k)-1]] = v[:len(v)-1]
|
||||||
|
}
|
||||||
|
|
||||||
c.err = io.EOF
|
c.err = io.EOF
|
||||||
return num, c.err
|
return num, c.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b, err = c.reader.ReadByte()
|
||||||
|
if b != '\r' || err != nil {
|
||||||
|
c.err = errMalformedChunkedEncoding
|
||||||
|
return num, c.err
|
||||||
|
}
|
||||||
|
b, err = c.reader.ReadByte()
|
||||||
|
if err == io.EOF {
|
||||||
|
err = io.ErrUnexpectedEOF
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
c.err = err
|
||||||
|
return num, c.err
|
||||||
|
}
|
||||||
|
if b != '\n' {
|
||||||
|
c.err = errMalformedChunkedEncoding
|
||||||
|
return num, c.err
|
||||||
|
}
|
||||||
|
|
||||||
c.offset = copy(buf, c.buffer)
|
c.offset = copy(buf, c.buffer)
|
||||||
num += c.offset
|
num += c.offset
|
||||||
return num, err
|
return num, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newUnsignedChunkedReader(req *http.Request) (io.ReadCloser, error) {
|
func (c *s3UnsignedChunkReader) TrailerHeaders() map[string]string {
|
||||||
ctx := req.Context()
|
return c.trailers
|
||||||
box, err := middleware.GetBoxData(ctx)
|
}
|
||||||
if err != nil {
|
|
||||||
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
|
||||||
}
|
|
||||||
|
|
||||||
authHeaders, err := middleware.GetAuthHeaders(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentCredentials := aws.Credentials{AccessKeyID: authHeaders.AccessKeyID, SecretAccessKey: box.Gate.SecretKey}
|
|
||||||
seed, err := hex.DecodeString(authHeaders.SignatureV4)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errs.GetAPIError(errs.ErrSignatureDoesNotMatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
reqTime, err := middleware.GetClientTime(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errs.GetAPIError(errs.ErrMalformedDate)
|
|
||||||
}
|
|
||||||
newStreamSigner := v4.NewStreamSigner(currentCredentials, "s3", authHeaders.Region, seed)
|
|
||||||
|
|
||||||
|
func newUnsignedChunkedReader(body io.Reader) (*s3UnsignedChunkReader, error) {
|
||||||
return &s3UnsignedChunkReader{
|
return &s3UnsignedChunkReader{
|
||||||
ctx: ctx,
|
reader: bufio.NewReader(body),
|
||||||
reader: bufio.NewReader(req.Body),
|
trailers: map[string]string{},
|
||||||
streamSigner: newStreamSigner,
|
buffer: make([]byte, 64*1024),
|
||||||
requestTime: reqTime,
|
|
||||||
buffer: make([]byte, 64*1024),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,11 @@ func (c *s3v4aChunkReader) handleErr(num int, err error) (int, error) {
|
||||||
return num, c.err
|
return num, c.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSignV4aChunkedReader(req *http.Request) (io.ReadCloser, error) {
|
func (c *s3v4aChunkReader) TrailerHeaders() map[string]string {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSignV4aChunkedReader(req *http.Request) (*s3v4aChunkReader, error) {
|
||||||
box, err := middleware.GetBoxData(req.Context())
|
box, err := middleware.GetBoxData(req.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
||||||
|
|
Loading…
Add table
Reference in a new issue