From 614d7037264f7aa4a66fd3d3b0077baaf72f5398 Mon Sep 17 00:00:00 2001 From: Artem Tataurov Date: Fri, 2 Jun 2023 10:44:25 +0300 Subject: [PATCH] [#106] Add chunk uploading Signed-off-by: Artem Tataurov --- CHANGELOG.md | 1 + api/auth/center.go | 27 ++-- api/auth/center_test.go | 14 +- api/handler/multipart_upload.go | 21 +++ api/handler/put.go | 21 +++ api/handler/put_test.go | 93 +++++++++++++ api/handler/s3reader.go | 224 ++++++++++++++++++++++++++++++++ api/headers.go | 17 +++ api/user_auth.go | 4 + 9 files changed, 404 insertions(+), 18 deletions(-) create mode 100644 api/handler/s3reader.go diff --git a/CHANGELOG.md b/CHANGELOG.md index afdc22f..e8a93b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ This document outlines major changes between releases. - Use `DisableURIPathEscaping` to presign urls (#125) ### Added +- Implement chunk uploading (#106) - Reload default and custom copies numbers on SIGHUP (#104) - Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70) - Return `X-Owner-Id` in `head-bucket` response (#79) diff --git a/api/auth/center.go b/api/auth/center.go index 18a07ef..0d4cc59 100644 --- a/api/auth/center.go +++ b/api/auth/center.go @@ -38,8 +38,9 @@ type ( // Box contains access box and additional info. Box struct { - AccessBox *accessbox.Box - ClientTime time.Time + AccessBox *accessbox.Box + ClientTime time.Time + AuthHeaders *AuthHeader } center struct { @@ -51,7 +52,8 @@ type ( prs int - authHeader struct { + //nolint:revive + AuthHeader struct { AccessKeyID string Service string Region string @@ -101,7 +103,7 @@ func New(frostFS tokens.FrostFS, key *keys.PrivateKey, prefixes []string, config } } -func (c *center) parseAuthHeader(header string) (*authHeader, error) { +func (c *center) parseAuthHeader(header string) (*AuthHeader, error) { submatches := c.reg.GetSubmatches(header) if len(submatches) != authHeaderPartsNum { return nil, apiErrors.GetAPIError(apiErrors.ErrAuthorizationHeaderMalformed) @@ -114,7 +116,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) { signedFields := strings.Split(submatches["signed_header_fields"], ";") - return &authHeader{ + return &AuthHeader{ AccessKeyID: submatches["access_key_id"], Service: submatches["service"], Region: submatches["region"], @@ -124,7 +126,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) { }, nil } -func (a *authHeader) getAddress() (oid.Address, error) { +func (a *AuthHeader) getAddress() (oid.Address, error) { var addr oid.Address if err := addr.DecodeString(strings.ReplaceAll(a.AccessKeyID, "0", "/")); err != nil { return addr, apiErrors.GetAPIError(apiErrors.ErrInvalidAccessKeyID) @@ -135,7 +137,7 @@ func (a *authHeader) getAddress() (oid.Address, error) { func (c *center) Authenticate(r *http.Request) (*Box, error) { var ( err error - authHdr *authHeader + authHdr *AuthHeader signatureDateTimeStr string needClientTime bool ) @@ -146,7 +148,7 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) { if len(creds) != 5 || creds[4] != "aws4_request" { return nil, fmt.Errorf("bad X-Amz-Credential") } - authHdr = &authHeader{ + authHdr = &AuthHeader{ AccessKeyID: creds[0], Service: creds[3], Region: creds[2], @@ -200,7 +202,10 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) { return nil, err } - result := &Box{AccessBox: box} + result := &Box{ + AccessBox: box, + AuthHeaders: authHdr, + } if needClientTime { result.ClientTime = signatureDateTime } @@ -267,7 +272,7 @@ func (c *center) checkFormData(r *http.Request) (*Box, error) { return &Box{AccessBox: box}, nil } -func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request { +func cloneRequest(r *http.Request, authHeader *AuthHeader) *http.Request { otherRequest := r.Clone(context.TODO()) otherRequest.Header = make(http.Header) @@ -288,7 +293,7 @@ func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request { return otherRequest } -func (c *center) checkSign(authHeader *authHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error { +func (c *center) checkSign(authHeader *AuthHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error { awsCreds := credentials.NewStaticCredentials(authHeader.AccessKeyID, box.Gate.AccessKey, "") signer := v4.NewSigner(awsCreds) signer.DisableURIPathEscaping = true diff --git a/api/auth/center_test.go b/api/auth/center_test.go index 6de1a07..15069a2 100644 --- a/api/auth/center_test.go +++ b/api/auth/center_test.go @@ -19,12 +19,12 @@ func TestAuthHeaderParse(t *testing.T) { for _, tc := range []struct { header string err error - expected *authHeader + expected *AuthHeader }{ { header: defaultHeader, err: nil, - expected: &authHeader{ + expected: &AuthHeader{ AccessKeyID: "oid0cid", Service: "s3", Region: "us-east-1", @@ -54,29 +54,29 @@ func TestAuthHeaderGetAddress(t *testing.T) { defaulErr := errors.GetAPIError(errors.ErrInvalidAccessKeyID) for _, tc := range []struct { - authHeader *authHeader + authHeader *AuthHeader err error }{ { - authHeader: &authHeader{ + authHeader: &AuthHeader{ AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJM0HrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB", }, err: nil, }, { - authHeader: &authHeader{ + authHeader: &AuthHeader{ AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJMHrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB", }, err: defaulErr, }, { - authHeader: &authHeader{ + authHeader: &AuthHeader{ AccessKeyID: "oid0cid", }, err: defaulErr, }, { - authHeader: &authHeader{ + authHeader: &AuthHeader{ AccessKeyID: "oidcid", }, err: defaulErr, diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index dd82e24..ea97c82 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -232,6 +232,27 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) { Reader: r.Body, } + if api.IsSignedStreamingV4(r) { + if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 { + _, err := strconv.Atoi(decodeContentSize) + if err != nil { + h.logAndSendError(w, "cannot parse decode content length information", reqInfo, + errors.GetAPIError(errors.ErrMissingContentLength)) + return + } + } else { + h.logAndSendError(w, "expecting decode content length information", reqInfo, + errors.GetAPIError(errors.ErrMissingContentLength)) + return + } + chunkReader, err := newSignV4ChunkedReader(r) + if err != nil { + h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err) + return + } + p.Reader = chunkReader + } + p.Info.Encryption, err = formEncryptionParams(r) if err != nil { h.logAndSendError(w, "invalid sse headers", reqInfo, err) diff --git a/api/handler/put.go b/api/handler/put.go index 6e4fc7a..6da9662 100644 --- a/api/handler/put.go +++ b/api/handler/put.go @@ -233,6 +233,27 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) { Encryption: encryptionParams, } + if api.IsSignedStreamingV4(r) { + if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 { + _, err := strconv.Atoi(decodeContentSize) + if err != nil { + h.logAndSendError(w, "cannot parse decode content length information", reqInfo, + errors.GetAPIError(errors.ErrMissingContentLength)) + return + } + } else { + h.logAndSendError(w, "expecting decode content length information", reqInfo, + errors.GetAPIError(errors.ErrMissingContentLength)) + return + } + chunkReader, err := newSignV4ChunkedReader(r) + if err != nil { + h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err) + return + } + params.Reader = chunkReader + } + params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint) if err != nil { h.logAndSendError(w, "invalid copies number", reqInfo, err) diff --git a/api/handler/put_test.go b/api/handler/put_test.go index 9bdc0bc..044a0fa 100644 --- a/api/handler/put_test.go +++ b/api/handler/put_test.go @@ -2,16 +2,24 @@ package handler import ( "bytes" + "context" "encoding/json" + "io" "mime/multipart" "net/http" + "net/http/httptest" "strconv" "strings" "testing" "time" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" + v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/stretchr/testify/require" ) @@ -146,3 +154,88 @@ func TestPutObjectWithNegativeContentLength(t *testing.T) { assertStatus(t, w, http.StatusOK) require.Equal(t, strconv.Itoa(len(content)), w.Header().Get(api.ContentLength)) } + +func TestPutObjectWithStreamBodyError(t *testing.T) { + tc := prepareHandlerContext(t) + + bktName, objName := "bucket-for-put", "object-for-put" + createTestBucket(tc, bktName) + + content := []byte("content") + w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content)) + r.Header.Set(api.AmzContentSha256, api.StreamingContentSHA256) + r.Header.Set(api.ContentEncoding, api.AwsChunked) + tc.Handler().PutObjectHandler(w, r) + assertS3Error(t, w, errors.GetAPIError(errors.ErrMissingContentLength)) + + checkNotFound(t, tc, bktName, objName, emptyVersion) +} + +func TestPutObjectWithStreamBodyAWSExample(t *testing.T) { + tc := prepareHandlerContext(t) + + bktName, objName := "examplebucket", "chunkObject.txt" + createTestBucket(tc, bktName) + + chunk := make([]byte, 65*1024) + for i := range chunk { + chunk[i] = 'a' + } + chunk1 := chunk[:64*1024] + chunk2 := chunk[64*1024:] + + AWSAccessKeyID := "AKIAIOSFODNN7EXAMPLE" + AWSSecretAccessKey := "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + + awsCreds := credentials.NewStaticCredentials(AWSAccessKeyID, AWSSecretAccessKey, "") + signer := v4.NewSigner(awsCreds) + + reqBody := bytes.NewBufferString("10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n") + _, err := reqBody.Write(chunk1) + require.NoError(t, err) + _, err = reqBody.WriteString("\r\n400;chunk-signature=0055627c9e194cb4542bae2aa5492e3c1575bbb81b612b7d234b86a503ef5497\r\n") + require.NoError(t, err) + _, err = reqBody.Write(chunk2) + require.NoError(t, err) + _, err = reqBody.WriteString("\r\n0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n") + require.NoError(t, err) + + req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil) + require.NoError(t, err) + req.Header.Set("content-encoding", "aws-chunked") + req.Header.Set("content-length", "66824") + req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") + req.Header.Set("x-amz-decoded-content-length", "66560") + req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY") + + signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z") + require.NoError(t, err) + + _, err = signer.Sign(req, nil, "s3", "us-east-1", signTime) + require.NoError(t, err) + + req.Body = io.NopCloser(reqBody) + + w := httptest.NewRecorder() + reqInfo := api.NewReqInfo(w, req, api.ObjectRequest{Bucket: bktName, Object: objName}) + req = req.WithContext(api.SetReqInfo(tc.Context(), reqInfo)) + req = req.WithContext(context.WithValue(req.Context(), api.ClientTime, signTime)) + req = req.WithContext(context.WithValue(req.Context(), api.AuthHeaders, &auth.AuthHeader{ + AccessKeyID: AWSAccessKeyID, + SignatureV4: "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9", + Service: "s3", + Region: "us-east-1", + })) + req = req.WithContext(context.WithValue(req.Context(), api.BoxData, &accessbox.Box{ + Gate: &accessbox.GateData{ + AccessKey: AWSSecretAccessKey, + }, + })) + tc.Handler().PutObjectHandler(w, req) + assertStatus(t, w, http.StatusOK) + + data := getObjectRange(t, tc, bktName, objName, 0, 66824) + for i := range chunk { + require.Equal(t, chunk[i], data[i]) + } +} diff --git a/api/handler/s3reader.go b/api/handler/s3reader.go new file mode 100644 index 0000000..c76d8db --- /dev/null +++ b/api/handler/s3reader.go @@ -0,0 +1,224 @@ +package handler + +import ( + "bufio" + "bytes" + "encoding/hex" + "errors" + "io" + "net/http" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" + v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4" + errs "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" + "github.com/aws/aws-sdk-go/aws/credentials" +) + +const ( + chunkSignatureHeader = "chunk-signature=" + maxChunkSize = 16 << 20 +) + +type ( + s3ChunkReader struct { + reader *bufio.Reader + streamSigner *v4.StreamSigner + + requestTime time.Time + buffer []byte + offset int + err error + } +) + +var ( + errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB") + errMalformedChunkedEncoding = errors.New("malformed chunked encoding") +) + +func (c *s3ChunkReader) Close() (err error) { + return nil +} + +func (c *s3ChunkReader) Read(buf []byte) (num int, err error) { + if c.offset > 0 { + num = copy(buf, c.buffer[c.offset:]) + if num == len(buf) { + c.offset += num + return num, nil + } + c.offset = 0 + buf = buf[num:] + } + + var size int + for { + b, err := c.reader.ReadByte() + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if b == ';' { // separating character + break + } + + // Manually deserialize the size since AWS specified + // the chunk size to be of variable width. In particular, + // a size of 16 is encoded as `10` while a size of 64 KB + // is `10000`. + switch { + case b >= '0' && b <= '9': + size = size<<4 | int(b-'0') + case b >= 'a' && b <= 'f': + size = size<<4 | int(b-('a'-10)) + case b >= 'A' && b <= 'F': + size = size<<4 | int(b-('A'-10)) + default: + c.err = errMalformedChunkedEncoding + return num, c.err + } + if size > maxChunkSize { + c.err = errGiantChunk + return num, c.err + } + } + + // Now, we read the signature of the following payload and expect: + // chunk-signature=" + + "\r\n" + // + // The signature is 64 bytes long (hex-encoded SHA256 hash) and + // starts with a 16 byte header: len("chunk-signature=") + 64 == 80. + var signature [80]byte + _, err = io.ReadFull(c.reader, signature[:]) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if !bytes.HasPrefix(signature[:], []byte(chunkSignatureHeader)) { + c.err = errMalformedChunkedEncoding + return num, c.err + } + b, err := c.reader.ReadByte() + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if b != '\r' { + c.err = errMalformedChunkedEncoding + return num, c.err + } + b, err = c.reader.ReadByte() + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if b != '\n' { + c.err = errMalformedChunkedEncoding + return num, c.err + } + + if cap(c.buffer) < size { + c.buffer = make([]byte, size) + } else { + c.buffer = c.buffer[:size] + } + + // Now, we read the payload and compute its SHA-256 hash. + _, err = io.ReadFull(c.reader, c.buffer) + if err == io.EOF && size != 0 { + err = io.ErrUnexpectedEOF + } + if err != nil && err != io.EOF { + c.err = err + return num, c.err + } + b, err = c.reader.ReadByte() + if b != '\r' || err != nil { + c.err = errMalformedChunkedEncoding + return num, c.err + } + b, err = c.reader.ReadByte() + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + c.err = err + return num, c.err + } + if b != '\n' { + c.err = errMalformedChunkedEncoding + return num, c.err + } + + // Once we have read the entire chunk successfully, we verify + // that the received signature matches our computed signature. + + calculatedSignature, err := c.streamSigner.GetSignature(nil, c.buffer, c.requestTime) + if err != nil { + c.err = err + return num, c.err + } + if string(signature[16:]) != hex.EncodeToString(calculatedSignature) { + c.err = errs.GetAPIError(errs.ErrSignatureDoesNotMatch) + return num, c.err + } + + // If the chunk size is zero we return io.EOF. As specified by AWS, + // only the last chunk is zero-sized. + if size == 0 { + c.err = io.EOF + return num, c.err + } + + c.offset = copy(buf, c.buffer) + num += c.offset + return num, err +} + +func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, error) { + // Expecting to refactor this in future: + // https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/issues/137 + box, ok := req.Context().Value(api.BoxData).(*accessbox.Box) + if !ok { + return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed) + } + + authHeaders, ok := req.Context().Value(api.AuthHeaders).(*auth.AuthHeader) + if !ok { + return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed) + } + + currentCredentials := credentials.NewStaticCredentials(authHeaders.AccessKeyID, box.Gate.AccessKey, "") + seed, err := hex.DecodeString(authHeaders.SignatureV4) + if err != nil { + return nil, errs.GetAPIError(errs.ErrSignatureDoesNotMatch) + } + + reqTime, ok := req.Context().Value(api.ClientTime).(time.Time) + if !ok { + return nil, errs.GetAPIError(errs.ErrMalformedDate) + } + newStreamSigner := v4.NewStreamSigner(authHeaders.Region, "s3", seed, currentCredentials) + + return &s3ChunkReader{ + reader: bufio.NewReader(req.Body), + streamSigner: newStreamSigner, + requestTime: reqTime, + buffer: make([]byte, 64*1024), + }, nil +} diff --git a/api/headers.go b/api/headers.go index 42280ad..d6fde0e 100644 --- a/api/headers.go +++ b/api/headers.go @@ -1,5 +1,7 @@ package api +import "net/http" + // Standard S3 HTTP request/response constants. const ( MetadataPrefix = "X-Amz-Meta-" @@ -39,11 +41,13 @@ const ( IfMatch = "If-Match" IfNoneMatch = "If-None-Match" + AmzContentSha256 = "X-Amz-Content-Sha256" AmzCopyIfModifiedSince = "X-Amz-Copy-Source-If-Modified-Since" AmzCopyIfUnmodifiedSince = "X-Amz-Copy-Source-If-Unmodified-Since" AmzCopyIfMatch = "X-Amz-Copy-Source-If-Match" AmzCopyIfNoneMatch = "X-Amz-Copy-Source-If-None-Match" AmzACL = "X-Amz-Acl" + AmzDecodedContentLength = "X-Amz-Decoded-Content-Length" AmzGrantFullControl = "X-Amz-Grant-Full-Control" AmzGrantRead = "X-Amz-Grant-Read" AmzGrantWrite = "X-Amz-Grant-Write" @@ -78,9 +82,13 @@ const ( AccessControlRequestMethod = "Access-Control-Request-Method" AccessControlRequestHeaders = "Access-Control-Request-Headers" + AwsChunked = "aws-chunked" + Vary = "Vary" DefaultLocationConstraint = "default" + + StreamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" ) // S3 request query params. @@ -107,3 +115,12 @@ var SystemMetadata = map[string]struct{}{ LastModified: {}, ETag: {}, } + +func IsSignedStreamingV4(r *http.Request) bool { + // The Content-Encoding must have "aws-chunked" as part of its value. + // https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html + // Minio does not set this value, thus for compatibility reasons + // we do not check it. + return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 && + r.Method == http.MethodPut +} diff --git a/api/user_auth.go b/api/user_auth.go index e395522..0bfb4fd 100644 --- a/api/user_auth.go +++ b/api/user_auth.go @@ -13,6 +13,9 @@ import ( // KeyWrapper is wrapper for context keys. type KeyWrapper string +// AuthHeaders is a wrapper for authentication headers of a request. +var AuthHeaders = KeyWrapper("__context_auth_headers_key") + // BoxData is an ID used to store accessbox.Box in a context. var BoxData = KeyWrapper("__context_box_key") @@ -42,6 +45,7 @@ func AuthMiddleware(log *zap.Logger, center auth.Center) mux.MiddlewareFunc { if !box.ClientTime.IsZero() { ctx = context.WithValue(ctx, ClientTime, box.ClientTime) } + ctx = context.WithValue(ctx, AuthHeaders, box.AuthHeaders) } h.ServeHTTP(w, r.WithContext(ctx))