[#106] Add chunk uploading

Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
This commit is contained in:
Artem Tataurov 2023-06-02 10:44:25 +03:00
parent 23593eee3d
commit 614d703726
9 changed files with 404 additions and 18 deletions

View file

@ -13,6 +13,7 @@ This document outlines major changes between releases.
- Use `DisableURIPathEscaping` to presign urls (#125)
### Added
- Implement chunk uploading (#106)
- Reload default and custom copies numbers on SIGHUP (#104)
- Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70)
- Return `X-Owner-Id` in `head-bucket` response (#79)

View file

@ -40,6 +40,7 @@ type (
Box struct {
AccessBox *accessbox.Box
ClientTime time.Time
AuthHeaders *AuthHeader
}
center struct {
@ -51,7 +52,8 @@ type (
prs int
authHeader struct {
//nolint:revive
AuthHeader struct {
AccessKeyID string
Service string
Region string
@ -101,7 +103,7 @@ func New(frostFS tokens.FrostFS, key *keys.PrivateKey, prefixes []string, config
}
}
func (c *center) parseAuthHeader(header string) (*authHeader, error) {
func (c *center) parseAuthHeader(header string) (*AuthHeader, error) {
submatches := c.reg.GetSubmatches(header)
if len(submatches) != authHeaderPartsNum {
return nil, apiErrors.GetAPIError(apiErrors.ErrAuthorizationHeaderMalformed)
@ -114,7 +116,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) {
signedFields := strings.Split(submatches["signed_header_fields"], ";")
return &authHeader{
return &AuthHeader{
AccessKeyID: submatches["access_key_id"],
Service: submatches["service"],
Region: submatches["region"],
@ -124,7 +126,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) {
}, nil
}
func (a *authHeader) getAddress() (oid.Address, error) {
func (a *AuthHeader) getAddress() (oid.Address, error) {
var addr oid.Address
if err := addr.DecodeString(strings.ReplaceAll(a.AccessKeyID, "0", "/")); err != nil {
return addr, apiErrors.GetAPIError(apiErrors.ErrInvalidAccessKeyID)
@ -135,7 +137,7 @@ func (a *authHeader) getAddress() (oid.Address, error) {
func (c *center) Authenticate(r *http.Request) (*Box, error) {
var (
err error
authHdr *authHeader
authHdr *AuthHeader
signatureDateTimeStr string
needClientTime bool
)
@ -146,7 +148,7 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) {
if len(creds) != 5 || creds[4] != "aws4_request" {
return nil, fmt.Errorf("bad X-Amz-Credential")
}
authHdr = &authHeader{
authHdr = &AuthHeader{
AccessKeyID: creds[0],
Service: creds[3],
Region: creds[2],
@ -200,7 +202,10 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) {
return nil, err
}
result := &Box{AccessBox: box}
result := &Box{
AccessBox: box,
AuthHeaders: authHdr,
}
if needClientTime {
result.ClientTime = signatureDateTime
}
@ -267,7 +272,7 @@ func (c *center) checkFormData(r *http.Request) (*Box, error) {
return &Box{AccessBox: box}, nil
}
func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request {
func cloneRequest(r *http.Request, authHeader *AuthHeader) *http.Request {
otherRequest := r.Clone(context.TODO())
otherRequest.Header = make(http.Header)
@ -288,7 +293,7 @@ func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request {
return otherRequest
}
func (c *center) checkSign(authHeader *authHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error {
func (c *center) checkSign(authHeader *AuthHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error {
awsCreds := credentials.NewStaticCredentials(authHeader.AccessKeyID, box.Gate.AccessKey, "")
signer := v4.NewSigner(awsCreds)
signer.DisableURIPathEscaping = true

View file

@ -19,12 +19,12 @@ func TestAuthHeaderParse(t *testing.T) {
for _, tc := range []struct {
header string
err error
expected *authHeader
expected *AuthHeader
}{
{
header: defaultHeader,
err: nil,
expected: &authHeader{
expected: &AuthHeader{
AccessKeyID: "oid0cid",
Service: "s3",
Region: "us-east-1",
@ -54,29 +54,29 @@ func TestAuthHeaderGetAddress(t *testing.T) {
defaulErr := errors.GetAPIError(errors.ErrInvalidAccessKeyID)
for _, tc := range []struct {
authHeader *authHeader
authHeader *AuthHeader
err error
}{
{
authHeader: &authHeader{
authHeader: &AuthHeader{
AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJM0HrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB",
},
err: nil,
},
{
authHeader: &authHeader{
authHeader: &AuthHeader{
AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJMHrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB",
},
err: defaulErr,
},
{
authHeader: &authHeader{
authHeader: &AuthHeader{
AccessKeyID: "oid0cid",
},
err: defaulErr,
},
{
authHeader: &authHeader{
authHeader: &AuthHeader{
AccessKeyID: "oidcid",
},
err: defaulErr,

View file

@ -232,6 +232,27 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
Reader: r.Body,
}
if api.IsSignedStreamingV4(r) {
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
_, err := strconv.Atoi(decodeContentSize)
if err != nil {
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
errors.GetAPIError(errors.ErrMissingContentLength))
return
}
} else {
h.logAndSendError(w, "expecting decode content length information", reqInfo,
errors.GetAPIError(errors.ErrMissingContentLength))
return
}
chunkReader, err := newSignV4ChunkedReader(r)
if err != nil {
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
return
}
p.Reader = chunkReader
}
p.Info.Encryption, err = formEncryptionParams(r)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err)

View file

@ -233,6 +233,27 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
Encryption: encryptionParams,
}
if api.IsSignedStreamingV4(r) {
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
_, err := strconv.Atoi(decodeContentSize)
if err != nil {
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
errors.GetAPIError(errors.ErrMissingContentLength))
return
}
} else {
h.logAndSendError(w, "expecting decode content length information", reqInfo,
errors.GetAPIError(errors.ErrMissingContentLength))
return
}
chunkReader, err := newSignV4ChunkedReader(r)
if err != nil {
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
return
}
params.Reader = chunkReader
}
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err)

View file

@ -2,16 +2,24 @@ package handler
import (
"bytes"
"context"
"encoding/json"
"io"
"mime/multipart"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/stretchr/testify/require"
)
@ -146,3 +154,88 @@ func TestPutObjectWithNegativeContentLength(t *testing.T) {
assertStatus(t, w, http.StatusOK)
require.Equal(t, strconv.Itoa(len(content)), w.Header().Get(api.ContentLength))
}
func TestPutObjectWithStreamBodyError(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
r.Header.Set(api.AmzContentSha256, api.StreamingContentSHA256)
r.Header.Set(api.ContentEncoding, api.AwsChunked)
tc.Handler().PutObjectHandler(w, r)
assertS3Error(t, w, errors.GetAPIError(errors.ErrMissingContentLength))
checkNotFound(t, tc, bktName, objName, emptyVersion)
}
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "examplebucket", "chunkObject.txt"
createTestBucket(tc, bktName)
chunk := make([]byte, 65*1024)
for i := range chunk {
chunk[i] = 'a'
}
chunk1 := chunk[:64*1024]
chunk2 := chunk[64*1024:]
AWSAccessKeyID := "AKIAIOSFODNN7EXAMPLE"
AWSSecretAccessKey := "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
awsCreds := credentials.NewStaticCredentials(AWSAccessKeyID, AWSSecretAccessKey, "")
signer := v4.NewSigner(awsCreds)
reqBody := bytes.NewBufferString("10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n")
_, err := reqBody.Write(chunk1)
require.NoError(t, err)
_, err = reqBody.WriteString("\r\n400;chunk-signature=0055627c9e194cb4542bae2aa5492e3c1575bbb81b612b7d234b86a503ef5497\r\n")
require.NoError(t, err)
_, err = reqBody.Write(chunk2)
require.NoError(t, err)
_, err = reqBody.WriteString("\r\n0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n")
require.NoError(t, err)
req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil)
require.NoError(t, err)
req.Header.Set("content-encoding", "aws-chunked")
req.Header.Set("content-length", "66824")
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
req.Header.Set("x-amz-decoded-content-length", "66560")
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z")
require.NoError(t, err)
_, err = signer.Sign(req, nil, "s3", "us-east-1", signTime)
require.NoError(t, err)
req.Body = io.NopCloser(reqBody)
w := httptest.NewRecorder()
reqInfo := api.NewReqInfo(w, req, api.ObjectRequest{Bucket: bktName, Object: objName})
req = req.WithContext(api.SetReqInfo(tc.Context(), reqInfo))
req = req.WithContext(context.WithValue(req.Context(), api.ClientTime, signTime))
req = req.WithContext(context.WithValue(req.Context(), api.AuthHeaders, &auth.AuthHeader{
AccessKeyID: AWSAccessKeyID,
SignatureV4: "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9",
Service: "s3",
Region: "us-east-1",
}))
req = req.WithContext(context.WithValue(req.Context(), api.BoxData, &accessbox.Box{
Gate: &accessbox.GateData{
AccessKey: AWSSecretAccessKey,
},
}))
tc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, tc, bktName, objName, 0, 66824)
for i := range chunk {
require.Equal(t, chunk[i], data[i])
}
}

224
api/handler/s3reader.go Normal file
View file

@ -0,0 +1,224 @@
package handler
import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"io"
"net/http"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
errs "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"github.com/aws/aws-sdk-go/aws/credentials"
)
const (
chunkSignatureHeader = "chunk-signature="
maxChunkSize = 16 << 20
)
type (
s3ChunkReader struct {
reader *bufio.Reader
streamSigner *v4.StreamSigner
requestTime time.Time
buffer []byte
offset int
err error
}
)
var (
errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB")
errMalformedChunkedEncoding = errors.New("malformed chunked encoding")
)
func (c *s3ChunkReader) Close() (err error) {
return nil
}
func (c *s3ChunkReader) Read(buf []byte) (num int, err error) {
if c.offset > 0 {
num = copy(buf, c.buffer[c.offset:])
if num == len(buf) {
c.offset += num
return num, nil
}
c.offset = 0
buf = buf[num:]
}
var size int
for {
b, err := c.reader.ReadByte()
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err != nil {
c.err = err
return num, c.err
}
if b == ';' { // separating character
break
}
// Manually deserialize the size since AWS specified
// the chunk size to be of variable width. In particular,
// a size of 16 is encoded as `10` while a size of 64 KB
// is `10000`.
switch {
case b >= '0' && b <= '9':
size = size<<4 | int(b-'0')
case b >= 'a' && b <= 'f':
size = size<<4 | int(b-('a'-10))
case b >= 'A' && b <= 'F':
size = size<<4 | int(b-('A'-10))
default:
c.err = errMalformedChunkedEncoding
return num, c.err
}
if size > maxChunkSize {
c.err = errGiantChunk
return num, c.err
}
}
// Now, we read the signature of the following payload and expect:
// chunk-signature=" + <signature-as-hex> + "\r\n"
//
// The signature is 64 bytes long (hex-encoded SHA256 hash) and
// starts with a 16 byte header: len("chunk-signature=") + 64 == 80.
var signature [80]byte
_, err = io.ReadFull(c.reader, signature[:])
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err != nil {
c.err = err
return num, c.err
}
if !bytes.HasPrefix(signature[:], []byte(chunkSignatureHeader)) {
c.err = errMalformedChunkedEncoding
return num, c.err
}
b, err := c.reader.ReadByte()
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err != nil {
c.err = err
return num, c.err
}
if b != '\r' {
c.err = errMalformedChunkedEncoding
return num, c.err
}
b, err = c.reader.ReadByte()
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err != nil {
c.err = err
return num, c.err
}
if b != '\n' {
c.err = errMalformedChunkedEncoding
return num, c.err
}
if cap(c.buffer) < size {
c.buffer = make([]byte, size)
} else {
c.buffer = c.buffer[:size]
}
// Now, we read the payload and compute its SHA-256 hash.
_, err = io.ReadFull(c.reader, c.buffer)
if err == io.EOF && size != 0 {
err = io.ErrUnexpectedEOF
}
if err != nil && err != io.EOF {
c.err = err
return num, c.err
}
b, err = c.reader.ReadByte()
if b != '\r' || err != nil {
c.err = errMalformedChunkedEncoding
return num, c.err
}
b, err = c.reader.ReadByte()
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if err != nil {
c.err = err
return num, c.err
}
if b != '\n' {
c.err = errMalformedChunkedEncoding
return num, c.err
}
// Once we have read the entire chunk successfully, we verify
// that the received signature matches our computed signature.
calculatedSignature, err := c.streamSigner.GetSignature(nil, c.buffer, c.requestTime)
if err != nil {
c.err = err
return num, c.err
}
if string(signature[16:]) != hex.EncodeToString(calculatedSignature) {
c.err = errs.GetAPIError(errs.ErrSignatureDoesNotMatch)
return num, c.err
}
// If the chunk size is zero we return io.EOF. As specified by AWS,
// only the last chunk is zero-sized.
if size == 0 {
c.err = io.EOF
return num, c.err
}
c.offset = copy(buf, c.buffer)
num += c.offset
return num, err
}
func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, error) {
// Expecting to refactor this in future:
// https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/issues/137
box, ok := req.Context().Value(api.BoxData).(*accessbox.Box)
if !ok {
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
}
authHeaders, ok := req.Context().Value(api.AuthHeaders).(*auth.AuthHeader)
if !ok {
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
}
currentCredentials := credentials.NewStaticCredentials(authHeaders.AccessKeyID, box.Gate.AccessKey, "")
seed, err := hex.DecodeString(authHeaders.SignatureV4)
if err != nil {
return nil, errs.GetAPIError(errs.ErrSignatureDoesNotMatch)
}
reqTime, ok := req.Context().Value(api.ClientTime).(time.Time)
if !ok {
return nil, errs.GetAPIError(errs.ErrMalformedDate)
}
newStreamSigner := v4.NewStreamSigner(authHeaders.Region, "s3", seed, currentCredentials)
return &s3ChunkReader{
reader: bufio.NewReader(req.Body),
streamSigner: newStreamSigner,
requestTime: reqTime,
buffer: make([]byte, 64*1024),
}, nil
}

View file

@ -1,5 +1,7 @@
package api
import "net/http"
// Standard S3 HTTP request/response constants.
const (
MetadataPrefix = "X-Amz-Meta-"
@ -39,11 +41,13 @@ const (
IfMatch = "If-Match"
IfNoneMatch = "If-None-Match"
AmzContentSha256 = "X-Amz-Content-Sha256"
AmzCopyIfModifiedSince = "X-Amz-Copy-Source-If-Modified-Since"
AmzCopyIfUnmodifiedSince = "X-Amz-Copy-Source-If-Unmodified-Since"
AmzCopyIfMatch = "X-Amz-Copy-Source-If-Match"
AmzCopyIfNoneMatch = "X-Amz-Copy-Source-If-None-Match"
AmzACL = "X-Amz-Acl"
AmzDecodedContentLength = "X-Amz-Decoded-Content-Length"
AmzGrantFullControl = "X-Amz-Grant-Full-Control"
AmzGrantRead = "X-Amz-Grant-Read"
AmzGrantWrite = "X-Amz-Grant-Write"
@ -78,9 +82,13 @@ const (
AccessControlRequestMethod = "Access-Control-Request-Method"
AccessControlRequestHeaders = "Access-Control-Request-Headers"
AwsChunked = "aws-chunked"
Vary = "Vary"
DefaultLocationConstraint = "default"
StreamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
)
// S3 request query params.
@ -107,3 +115,12 @@ var SystemMetadata = map[string]struct{}{
LastModified: {},
ETag: {},
}
func IsSignedStreamingV4(r *http.Request) bool {
// The Content-Encoding must have "aws-chunked" as part of its value.
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
// Minio does not set this value, thus for compatibility reasons
// we do not check it.
return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 &&
r.Method == http.MethodPut
}

View file

@ -13,6 +13,9 @@ import (
// KeyWrapper is wrapper for context keys.
type KeyWrapper string
// AuthHeaders is a wrapper for authentication headers of a request.
var AuthHeaders = KeyWrapper("__context_auth_headers_key")
// BoxData is an ID used to store accessbox.Box in a context.
var BoxData = KeyWrapper("__context_box_key")
@ -42,6 +45,7 @@ func AuthMiddleware(log *zap.Logger, center auth.Center) mux.MiddlewareFunc {
if !box.ClientTime.IsZero() {
ctx = context.WithValue(ctx, ClientTime, box.ClientTime)
}
ctx = context.WithValue(ctx, AuthHeaders, box.AuthHeaders)
}
h.ServeHTTP(w, r.WithContext(ctx))