From a4d9658fbb93eb6e6e87340a9e91104ea0d961df Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 3 Feb 2025 12:12:21 +0300 Subject: [PATCH] [#607] Support unsigned payload streaming with trailers Signed-off-by: Denis Kirillov --- api/handler/put.go | 21 ++++++-- api/handler/put_test.go | 79 +++++++++++++++++++++++++++- api/handler/s3reader.go | 7 ++- api/handler/s3reader_test.go | 63 ++++++++++++++++++++++ api/handler/s3unsignedreader.go | 92 +++++++++++++++++---------------- api/handler/s3v4aReader.go | 6 ++- 6 files changed, 216 insertions(+), 52 deletions(-) diff --git a/api/handler/put.go b/api/handler/put.go index 37796608..8b9bf7b5 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -311,10 +311,23 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { } } -func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) { +type BodyReader interface { + io.ReadCloser + TrailerHeaders() map[string]string +} + +type noTrailerBodyReader struct { + io.ReadCloser +} + +func (r *noTrailerBodyReader) TrailerHeaders() map[string]string { + return nil +} + +func (h *handler) getBodyReader(r *http.Request) (BodyReader, error) { shaType, streaming := api.IsSignedStreamingV4(r) if !streaming { - return r.Body, nil + return &noTrailerBodyReader{r.Body}, nil } encodings := r.Header.Values(api.ContentEncoding) @@ -350,7 +363,7 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) { var ( err error - chunkReader io.ReadCloser + chunkReader BodyReader ) switch shaType { case api.StreamingContentSHA256, api.StreamingContentSHA256Trailer: @@ -358,7 +371,7 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) { case api.StreamingContentV4aSHA256, api.StreamingContentV4aSHA256Trailer: chunkReader, err = newSignV4aChunkedReader(r) default: - chunkReader, err = newUnsignedChunkedReader(r) + chunkReader, err = newUnsignedChunkedReader(r.Body) } if err != nil { diff --git a/api/handler/put_test.go b/api/handler/put_test.go index ffb2b41e..8a980eeb 100644 --- a/api/handler/put_test.go +++ b/api/handler/put_test.go @@ -377,13 +377,34 @@ func TestPutObjectCheckContentSHA256(t *testing.T) { } } +func TestPutObjectWithStreamUnsignedBodySmall(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName, objName := "test2", "tmp.txt" + createTestBucket(hc, bktName) + + w, req, chunk := getChunkedRequestUnsignedTrailingSmall(hc.context, t, bktName, objName) + hc.Handler().PutObjectHandler(w, req) + assertStatus(t, w, http.StatusOK) + + w, req = prepareTestRequest(hc, bktName, objName, nil) + hc.Handler().HeadObjectHandler(w, req) + assertStatus(t, w, http.StatusOK) + require.Equal(t, "5", w.Header().Get(api.ContentLength)) + + data := getObjectRange(t, hc, bktName, objName, 0, 5) + for i := range chunk { + require.Equal(t, chunk[i], data[i]) + } +} + func TestPutObjectWithStreamUnsignedBody(t *testing.T) { hc := prepareHandlerContext(t) bktName, objName := "examplebucket", "chunkObject.txt" createTestBucket(hc, bktName) - w, req, chunk := getChunkedRequestTrailing(hc.context, t, bktName, objName) + w, req, chunk := getChunkedRequestUnsignedTrailing(hc.context, t, bktName, objName) hc.Handler().PutObjectHandler(w, req) assertStatus(t, w, http.StatusOK) @@ -404,7 +425,7 @@ func TestPutObjectWithStreamBodyAWSExampleTrailing(t *testing.T) { bktName, objName := "examplebucket", "chunkObject.txt" createTestBucket(hc, bktName) - w, req, chunk := getChunkedRequestUnsignedTrailing(hc.context, t, bktName, objName) + w, req, chunk := getChunkedRequestTrailing(hc.context, t, bktName, objName) hc.Handler().PutObjectHandler(w, req) assertStatus(t, w, http.StatusOK) @@ -677,6 +698,60 @@ func getChunkedRequestUnsignedTrailing(ctx context.Context, t *testing.T, bktNam return w, req, chunk } +func getChunkedRequestUnsignedTrailingSmall(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) { + AWSAccessKeyID := "9uEm8zMrGWsEDWiPCnVuQLKTiGtCEXpYXt8eBG7agupw0JDySJZMFuej7PTcPzRqBUyPtFowNu1RtvHULU8XHjie6" + AWSSecretAccessKey := "9f546428957ed7e189b7be928906ce7d1d9cb3042dd4d2d5194e28ce8c4c3b8e" + + awsCreds := aws.Credentials{AccessKeyID: AWSAccessKeyID, SecretAccessKey: AWSSecretAccessKey} + signer := v4.NewSigner() + + chunk := "tmp2\n" + + reqBody := bytes.NewBufferString("5\r\n") + _, err := reqBody.WriteString(chunk) + require.NoError(t, err) + _, err = reqBody.WriteString("\r\n0\r\n") + require.NoError(t, err) + _, err = reqBody.WriteString("x-amz-checksum-crc64nvme:q1EYl4rI0TU=\r\n\r\n") + require.NoError(t, err) + + req, err := http.NewRequest("PUT", "https://localhost:8184/"+bktName+"/"+objName, nil) + require.NoError(t, err) + req.Header.Set("x-amz-sdk-checksum-algorithm", "CRC64NVME") + req.Header.Set("content-encoding", api.AwsChunked) + req.Header.Set("x-amz-trailer", "x-amz-checksum-crc64nvme") + req.Header.Set("x-amz-content-sha256", api.StreamingUnsignedPayloadTrailer) + req.Header.Set("x-amz-decoded-content-length", "5") + req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY") + + signTime, err := time.Parse("20060102T150405Z", "20250203T063745Z") + require.NoError(t, err) + + err = signer.SignHTTP(ctx, awsCreds, req, api.StreamingContentSHA256Trailer, "s3", "ru", signTime) + require.NoError(t, err) + + req.Body = io.NopCloser(reqBody) + + w := httptest.NewRecorder() + reqInfo := middleware.NewReqInfo(w, req, middleware.ObjectRequest{Bucket: bktName, Object: objName}, "") + req = req.WithContext(middleware.SetReqInfo(ctx, reqInfo)) + req = req.WithContext(middleware.SetBox(req.Context(), &middleware.Box{ + ClientTime: signTime, + AuthHeaders: &middleware.AuthHeader{ + AccessKeyID: AWSAccessKeyID, + SignatureV4: "a075c83779d1c3c02254fbe4c9eff0a21556d15556fc6a25db69147c4838226b", + Region: "ru", + }, + AccessBox: &accessbox.Box{ + Gate: &accessbox.GateData{ + SecretKey: AWSSecretAccessKey, + }, + }, + })) + + return w, req, []byte(chunk) +} + func getEmptyChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request) { AWSAccessKeyID := "48c1K4PLVb7SvmV3PjDKEuXaMh8yZMXZ8Wx9msrkKcYw06dZeaxeiPe8vyFm2WsoeVaNt7UWEjNsVkagDs8oX4XXh" AWSSecretAccessKey := "09260955b4eb0279dc017ba20a1ddac909cbd226c86cbb2d868e55534c8e64b0" diff --git a/api/handler/s3reader.go b/api/handler/s3reader.go index c23e3874..e2efffad 100644 --- a/api/handler/s3reader.go +++ b/api/handler/s3reader.go @@ -37,6 +37,7 @@ type ( var ( errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB") errMalformedChunkedEncoding = errors.New("malformed chunked encoding") + errMalformedTrailerHeaders = errors.New("malformed trailer headers") ) func (c *s3ChunkReader) Close() (err error) { @@ -190,7 +191,11 @@ func (c *s3ChunkReader) Read(buf []byte) (num int, err error) { return num, err } -func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, error) { +func (c *s3ChunkReader) TrailerHeaders() map[string]string { + return nil +} + +func newSignV4ChunkedReader(req *http.Request) (*s3ChunkReader, error) { ctx := req.Context() box, err := middleware.GetBoxData(ctx) if err != nil { diff --git a/api/handler/s3reader_test.go b/api/handler/s3reader_test.go index 7ecb8245..c550beec 100644 --- a/api/handler/s3reader_test.go +++ b/api/handler/s3reader_test.go @@ -2,8 +2,11 @@ package handler import ( "bytes" + "fmt" "io" "net/http" + "strconv" + "strings" "testing" "time" @@ -55,3 +58,63 @@ func TestSigV4AStreaming(t *testing.T) { require.Equal(t, chunk1, string(data)) } + +func TestStreamingUnsigned(t *testing.T) { + chunk1 := "chunk1" + chunk2 := "chunk2" + + t.Run("with trailer", func(t *testing.T) { + chunks := []string{chunk1, chunk2} + trailer := map[string]string{"x-amz-checksum-crc64nvme": "q1EYl4rI0TU="} + body, expected := getChunkedBody(t, chunks, trailer) + + r, err := newUnsignedChunkedReader(body) + require.NoError(t, err) + + data, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, expected, string(data)) + + require.EqualValues(t, trailer, r.TrailerHeaders()) + }) + + t.Run("without trailer", func(t *testing.T) { + chunks := []string{chunk1, chunk2} + body, expected := getChunkedBody(t, chunks, nil) + + r, err := newUnsignedChunkedReader(body) + require.NoError(t, err) + + data, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, expected, string(data)) + }) +} + +func getChunkedBody(t *testing.T, chunks []string, trailers map[string]string) (*bytes.Buffer, string) { + res := bytes.NewBufferString("") + + for i, chunk := range chunks { + meta := strconv.FormatInt(int64(len(chunk)), 16) + "\r\n" + if i != 0 { + meta = "\r\n" + meta + } + _, err := res.WriteString(meta) + require.NoError(t, err) + _, err = res.WriteString(chunk) + require.NoError(t, err) + } + + _, err := res.WriteString("\r\n0\r\n") + require.NoError(t, err) + + for k, v := range trailers { + _, err := res.WriteString(fmt.Sprintf("%s:%s\n", k, v)) + require.NoError(t, err) + } + + _, err = res.WriteString("\r\n") + require.NoError(t, err) + + return res, strings.Join(chunks, "") +} diff --git a/api/handler/s3unsignedreader.go b/api/handler/s3unsignedreader.go index f18ec0aa..efd8049e 100644 --- a/api/handler/s3unsignedreader.go +++ b/api/handler/s3unsignedreader.go @@ -2,28 +2,17 @@ package handler import ( "bufio" - "context" - "encoding/hex" "io" - "net/http" - "time" - - v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4sdk2/signer/v4" - errs "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" - "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" - "github.com/aws/aws-sdk-go-v2/aws" ) type ( s3UnsignedChunkReader struct { - ctx context.Context - reader *bufio.Reader - streamSigner *v4.StreamSigner + reader *bufio.Reader - requestTime time.Time - buffer []byte - offset int - err error + trailers map[string]string + buffer []byte + offset int + err error } ) @@ -114,44 +103,59 @@ func (c *s3UnsignedChunkReader) Read(buf []byte) (num int, err error) { // If the chunk size is zero we return io.EOF. As specified by AWS, // only the last chunk is zero-sized. if size == 0 { + var k, v string + for err == nil { + k, err = c.reader.ReadString(':') + if err != nil { + if err == io.EOF { + break + } + c.err = errMalformedTrailerHeaders + return num, c.err + } + v, err = c.reader.ReadString('\n') + if err != nil { + c.err = errMalformedTrailerHeaders + return num, c.err + } + c.trailers[k[:len(k)-1]] = v[:len(v)-1] + } + c.err = io.EOF return num, c.err } + b, err = c.reader.ReadByte() + if b != '\r' || err != nil { + c.err = errMalformedChunkedEncoding + return num, c.err + } + b, err = c.reader.ReadByte() + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if b != '\n' { + c.err = errMalformedChunkedEncoding + return num, c.err + } + c.offset = copy(buf, c.buffer) num += c.offset return num, err } -func newUnsignedChunkedReader(req *http.Request) (io.ReadCloser, error) { - ctx := req.Context() - box, err := middleware.GetBoxData(ctx) - if err != nil { - return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed) - } - - authHeaders, err := middleware.GetAuthHeaders(ctx) - if err != nil { - return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed) - } - - currentCredentials := aws.Credentials{AccessKeyID: authHeaders.AccessKeyID, SecretAccessKey: box.Gate.SecretKey} - seed, err := hex.DecodeString(authHeaders.SignatureV4) - if err != nil { - return nil, errs.GetAPIError(errs.ErrSignatureDoesNotMatch) - } - - reqTime, err := middleware.GetClientTime(ctx) - if err != nil { - return nil, errs.GetAPIError(errs.ErrMalformedDate) - } - newStreamSigner := v4.NewStreamSigner(currentCredentials, "s3", authHeaders.Region, seed) +func (c *s3UnsignedChunkReader) TrailerHeaders() map[string]string { + return c.trailers +} +func newUnsignedChunkedReader(body io.Reader) (*s3UnsignedChunkReader, error) { return &s3UnsignedChunkReader{ - ctx: ctx, - reader: bufio.NewReader(req.Body), - streamSigner: newStreamSigner, - requestTime: reqTime, - buffer: make([]byte, 64*1024), + reader: bufio.NewReader(body), + trailers: map[string]string{}, + buffer: make([]byte, 64*1024), }, nil } diff --git a/api/handler/s3v4aReader.go b/api/handler/s3v4aReader.go index 5fb189c8..086f405e 100644 --- a/api/handler/s3v4aReader.go +++ b/api/handler/s3v4aReader.go @@ -168,7 +168,11 @@ func (c *s3v4aChunkReader) handleErr(num int, err error) (int, error) { return num, c.err } -func newSignV4aChunkedReader(req *http.Request) (io.ReadCloser, error) { +func (c *s3v4aChunkReader) TrailerHeaders() map[string]string { + return nil +} + +func newSignV4aChunkedReader(req *http.Request) (*s3v4aChunkReader, error) { box, err := middleware.GetBoxData(req.Context()) if err != nil { return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)