Add chunk uploading #126
|
@ -13,6 +13,7 @@ This document outlines major changes between releases.
|
|||
- Use `DisableURIPathEscaping` to presign urls (#125)
|
||||
|
||||
### Added
|
||||
- Implement chunk uploading (#106)
|
||||
- Reload default and custom copies numbers on SIGHUP (#104)
|
||||
- Add `copies_numbers` section to `placement_policy` in config file and support vectors of copies numbers (#70)
|
||||
- Return `X-Owner-Id` in `head-bucket` response (#79)
|
||||
|
|
|
@ -40,6 +40,7 @@ type (
|
|||
Box struct {
|
||||
AccessBox *accessbox.Box
|
||||
ClientTime time.Time
|
||||
AuthHeaders *AuthHeader
|
||||
}
|
||||
|
||||
center struct {
|
||||
|
@ -51,7 +52,8 @@ type (
|
|||
|
||||
prs int
|
||||
|
||||
authHeader struct {
|
||||
//nolint:revive
|
||||
AuthHeader struct {
|
||||
AccessKeyID string
|
||||
Service string
|
||||
Region string
|
||||
|
@ -101,7 +103,7 @@ func New(frostFS tokens.FrostFS, key *keys.PrivateKey, prefixes []string, config
|
|||
}
|
||||
}
|
||||
|
||||
func (c *center) parseAuthHeader(header string) (*authHeader, error) {
|
||||
func (c *center) parseAuthHeader(header string) (*AuthHeader, error) {
|
||||
submatches := c.reg.GetSubmatches(header)
|
||||
if len(submatches) != authHeaderPartsNum {
|
||||
return nil, apiErrors.GetAPIError(apiErrors.ErrAuthorizationHeaderMalformed)
|
||||
|
@ -114,7 +116,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) {
|
|||
|
||||
signedFields := strings.Split(submatches["signed_header_fields"], ";")
|
||||
|
||||
return &authHeader{
|
||||
return &AuthHeader{
|
||||
AccessKeyID: submatches["access_key_id"],
|
||||
Service: submatches["service"],
|
||||
Region: submatches["region"],
|
||||
|
@ -124,7 +126,7 @@ func (c *center) parseAuthHeader(header string) (*authHeader, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (a *authHeader) getAddress() (oid.Address, error) {
|
||||
func (a *AuthHeader) getAddress() (oid.Address, error) {
|
||||
var addr oid.Address
|
||||
if err := addr.DecodeString(strings.ReplaceAll(a.AccessKeyID, "0", "/")); err != nil {
|
||||
return addr, apiErrors.GetAPIError(apiErrors.ErrInvalidAccessKeyID)
|
||||
|
@ -135,7 +137,7 @@ func (a *authHeader) getAddress() (oid.Address, error) {
|
|||
func (c *center) Authenticate(r *http.Request) (*Box, error) {
|
||||
var (
|
||||
err error
|
||||
authHdr *authHeader
|
||||
authHdr *AuthHeader
|
||||
signatureDateTimeStr string
|
||||
needClientTime bool
|
||||
)
|
||||
|
@ -146,7 +148,7 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) {
|
|||
if len(creds) != 5 || creds[4] != "aws4_request" {
|
||||
return nil, fmt.Errorf("bad X-Amz-Credential")
|
||||
}
|
||||
authHdr = &authHeader{
|
||||
authHdr = &AuthHeader{
|
||||
AccessKeyID: creds[0],
|
||||
Service: creds[3],
|
||||
Region: creds[2],
|
||||
|
@ -200,7 +202,10 @@ func (c *center) Authenticate(r *http.Request) (*Box, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
result := &Box{AccessBox: box}
|
||||
result := &Box{
|
||||
AccessBox: box,
|
||||
AuthHeaders: authHdr,
|
||||
}
|
||||
if needClientTime {
|
||||
result.ClientTime = signatureDateTime
|
||||
}
|
||||
|
@ -267,7 +272,7 @@ func (c *center) checkFormData(r *http.Request) (*Box, error) {
|
|||
return &Box{AccessBox: box}, nil
|
||||
}
|
||||
|
||||
func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request {
|
||||
func cloneRequest(r *http.Request, authHeader *AuthHeader) *http.Request {
|
||||
otherRequest := r.Clone(context.TODO())
|
||||
otherRequest.Header = make(http.Header)
|
||||
|
||||
|
@ -288,7 +293,7 @@ func cloneRequest(r *http.Request, authHeader *authHeader) *http.Request {
|
|||
return otherRequest
|
||||
}
|
||||
|
||||
func (c *center) checkSign(authHeader *authHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error {
|
||||
func (c *center) checkSign(authHeader *AuthHeader, box *accessbox.Box, request *http.Request, signatureDateTime time.Time) error {
|
||||
awsCreds := credentials.NewStaticCredentials(authHeader.AccessKeyID, box.Gate.AccessKey, "")
|
||||
signer := v4.NewSigner(awsCreds)
|
||||
signer.DisableURIPathEscaping = true
|
||||
|
|
|
@ -19,12 +19,12 @@ func TestAuthHeaderParse(t *testing.T) {
|
|||
for _, tc := range []struct {
|
||||
header string
|
||||
err error
|
||||
expected *authHeader
|
||||
expected *AuthHeader
|
||||
}{
|
||||
{
|
||||
header: defaultHeader,
|
||||
err: nil,
|
||||
expected: &authHeader{
|
||||
expected: &AuthHeader{
|
||||
AccessKeyID: "oid0cid",
|
||||
Service: "s3",
|
||||
Region: "us-east-1",
|
||||
|
@ -54,29 +54,29 @@ func TestAuthHeaderGetAddress(t *testing.T) {
|
|||
defaulErr := errors.GetAPIError(errors.ErrInvalidAccessKeyID)
|
||||
|
||||
for _, tc := range []struct {
|
||||
authHeader *authHeader
|
||||
authHeader *AuthHeader
|
||||
err error
|
||||
}{
|
||||
{
|
||||
authHeader: &authHeader{
|
||||
authHeader: &AuthHeader{
|
||||
AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJM0HrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB",
|
||||
},
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
authHeader: &authHeader{
|
||||
authHeader: &AuthHeader{
|
||||
AccessKeyID: "vWqF8cMDRbJcvnPLALoQGnABPPhw8NyYMcGsfDPfZJMHrgjonN8CgFvCZ3kh9BUXw4W2tJ5E7EAGhueSF122HB",
|
||||
},
|
||||
err: defaulErr,
|
||||
},
|
||||
{
|
||||
authHeader: &authHeader{
|
||||
authHeader: &AuthHeader{
|
||||
AccessKeyID: "oid0cid",
|
||||
},
|
||||
err: defaulErr,
|
||||
},
|
||||
{
|
||||
authHeader: &authHeader{
|
||||
authHeader: &AuthHeader{
|
||||
AccessKeyID: "oidcid",
|
||||
},
|
||||
err: defaulErr,
|
||||
|
|
|
@ -232,6 +232,27 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Reader: r.Body,
|
||||
}
|
||||
|
||||
if api.IsSignedStreamingV4(r) {
|
||||
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
|
||||
_, err := strconv.Atoi(decodeContentSize)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
|
||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
||||
alexvanin marked this conversation as resolved
Outdated
|
||||
return
|
||||
}
|
||||
} else {
|
||||
h.logAndSendError(w, "expecting decode content length information", reqInfo,
|
||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
||||
return
|
||||
}
|
||||
chunkReader, err := newSignV4ChunkedReader(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
|
||||
return
|
||||
}
|
||||
p.Reader = chunkReader
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
||||
|
|
|
@ -233,6 +233,27 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Encryption: encryptionParams,
|
||||
}
|
||||
|
||||
dkirillov
commented
Let's move this to separate function
Let's move this to separate function
```golang
func (h *handler) getBodyReader(r *http.Request) (io.Reader, error) {
//...
}
```
ironbee
commented
What is meant by "this" here? What is meant by "this" here?
dkirillov
commented
https://git.frostfs.info/ironbee/frostfs-s3-gw/src/commit/b603a34c2833e4a614f6f8b5bc69e7adec192fcb/api/handler/put.go#L235-L251
|
||||
if api.IsSignedStreamingV4(r) {
|
||||
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
|
||||
_, err := strconv.Atoi(decodeContentSize)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
|
||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
||||
return
|
||||
}
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
After After `h.logAndSendError` we must use `return` statement
dkirillov
commented
To check this we can use for example test:
To check this we can use for example test:
```golang
func TestPutObjectWithStreamBodyError(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
r.Header.Set(api.AmzContentSha256, api.StreamingContentSHA256)
tc.Handler().PutObjectHandler(w, r)
assertS3Error(t, w, errors.GetAPIError(errors.ErrMissingContentLength))
checkNotFound(t, tc, bktName, objName, emptyVersion)
}
```
|
||||
} else {
|
||||
h.logAndSendError(w, "expecting decode content length information", reqInfo,
|
||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
||||
return
|
||||
}
|
||||
chunkReader, err := newSignV4ChunkedReader(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
|
||||
return
|
||||
}
|
||||
params.Reader = chunkReader
|
||||
}
|
||||
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
|
|
|
@ -2,16 +2,24 @@ package handler
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -146,3 +154,88 @@ func TestPutObjectWithNegativeContentLength(t *testing.T) {
|
|||
assertStatus(t, w, http.StatusOK)
|
||||
require.Equal(t, strconv.Itoa(len(content)), w.Header().Get(api.ContentLength))
|
||||
}
|
||||
|
||||
func TestPutObjectWithStreamBodyError(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-put", "object-for-put"
|
||||
createTestBucket(tc, bktName)
|
||||
|
||||
content := []byte("content")
|
||||
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||
r.Header.Set(api.AmzContentSha256, api.StreamingContentSHA256)
|
||||
r.Header.Set(api.ContentEncoding, api.AwsChunked)
|
||||
tc.Handler().PutObjectHandler(w, r)
|
||||
assertS3Error(t, w, errors.GetAPIError(errors.ErrMissingContentLength))
|
||||
|
||||
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||
}
|
||||
|
||||
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "examplebucket", "chunkObject.txt"
|
||||
createTestBucket(tc, bktName)
|
||||
|
||||
chunk := make([]byte, 65*1024)
|
||||
for i := range chunk {
|
||||
chunk[i] = 'a'
|
||||
}
|
||||
chunk1 := chunk[:64*1024]
|
||||
chunk2 := chunk[64*1024:]
|
||||
|
||||
AWSAccessKeyID := "AKIAIOSFODNN7EXAMPLE"
|
||||
AWSSecretAccessKey := "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
|
||||
|
||||
awsCreds := credentials.NewStaticCredentials(AWSAccessKeyID, AWSSecretAccessKey, "")
|
||||
signer := v4.NewSigner(awsCreds)
|
||||
|
||||
reqBody := bytes.NewBufferString("10000;chunk-signature=ad80c730a21e5b8d04586a2213dd63b9a0e99e0e2307b0ade35a65485a288648\r\n")
|
||||
_, err := reqBody.Write(chunk1)
|
||||
require.NoError(t, err)
|
||||
_, err = reqBody.WriteString("\r\n400;chunk-signature=0055627c9e194cb4542bae2aa5492e3c1575bbb81b612b7d234b86a503ef5497\r\n")
|
||||
require.NoError(t, err)
|
||||
_, err = reqBody.Write(chunk2)
|
||||
require.NoError(t, err)
|
||||
_, err = reqBody.WriteString("\r\n0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n\r\n")
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil)
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("content-encoding", "aws-chunked")
|
||||
req.Header.Set("content-length", "66824")
|
||||
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
|
||||
req.Header.Set("x-amz-decoded-content-length", "66560")
|
||||
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
|
||||
|
||||
signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = signer.Sign(req, nil, "s3", "us-east-1", signTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
req.Body = io.NopCloser(reqBody)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
reqInfo := api.NewReqInfo(w, req, api.ObjectRequest{Bucket: bktName, Object: objName})
|
||||
req = req.WithContext(api.SetReqInfo(tc.Context(), reqInfo))
|
||||
req = req.WithContext(context.WithValue(req.Context(), api.ClientTime, signTime))
|
||||
req = req.WithContext(context.WithValue(req.Context(), api.AuthHeaders, &auth.AuthHeader{
|
||||
AccessKeyID: AWSAccessKeyID,
|
||||
SignatureV4: "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9",
|
||||
Service: "s3",
|
||||
Region: "us-east-1",
|
||||
}))
|
||||
req = req.WithContext(context.WithValue(req.Context(), api.BoxData, &accessbox.Box{
|
||||
Gate: &accessbox.GateData{
|
||||
AccessKey: AWSSecretAccessKey,
|
||||
},
|
||||
}))
|
||||
tc.Handler().PutObjectHandler(w, req)
|
||||
assertStatus(t, w, http.StatusOK)
|
||||
|
||||
data := getObjectRange(t, tc, bktName, objName, 0, 66824)
|
||||
for i := range chunk {
|
||||
require.Equal(t, chunk[i], data[i])
|
||||
}
|
||||
}
|
||||
|
|
224
api/handler/s3reader.go
Normal file
|
@ -0,0 +1,224 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
|
||||
errs "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
)
|
||||
|
||||
const (
|
||||
chunkSignatureHeader = "chunk-signature="
|
||||
maxChunkSize = 16 << 20
|
||||
)
|
||||
|
||||
type (
|
||||
s3ChunkReader struct {
|
||||
reader *bufio.Reader
|
||||
streamSigner *v4.StreamSigner
|
||||
|
||||
requestTime time.Time
|
||||
buffer []byte
|
||||
offset int
|
||||
err error
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
errGiantChunk = errors.New("chunk too big: choose chunk size <= 16MiB")
|
||||
errMalformedChunkedEncoding = errors.New("malformed chunked encoding")
|
||||
)
|
||||
|
||||
func (c *s3ChunkReader) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *s3ChunkReader) Read(buf []byte) (num int, err error) {
|
||||
if c.offset > 0 {
|
||||
num = copy(buf, c.buffer[c.offset:])
|
||||
if num == len(buf) {
|
||||
c.offset += num
|
||||
return num, nil
|
||||
}
|
||||
c.offset = 0
|
||||
buf = buf[num:]
|
||||
}
|
||||
|
||||
var size int
|
||||
for {
|
||||
b, err := c.reader.ReadByte()
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if b == ';' { // separating character
|
||||
break
|
||||
}
|
||||
|
||||
// Manually deserialize the size since AWS specified
|
||||
// the chunk size to be of variable width. In particular,
|
||||
// a size of 16 is encoded as `10` while a size of 64 KB
|
||||
// is `10000`.
|
||||
switch {
|
||||
case b >= '0' && b <= '9':
|
||||
size = size<<4 | int(b-'0')
|
||||
case b >= 'a' && b <= 'f':
|
||||
size = size<<4 | int(b-('a'-10))
|
||||
case b >= 'A' && b <= 'F':
|
||||
size = size<<4 | int(b-('A'-10))
|
||||
default:
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
if size > maxChunkSize {
|
||||
c.err = errGiantChunk
|
||||
return num, c.err
|
||||
}
|
||||
}
|
||||
|
||||
// Now, we read the signature of the following payload and expect:
|
||||
// chunk-signature=" + <signature-as-hex> + "\r\n"
|
||||
//
|
||||
// The signature is 64 bytes long (hex-encoded SHA256 hash) and
|
||||
// starts with a 16 byte header: len("chunk-signature=") + 64 == 80.
|
||||
var signature [80]byte
|
||||
_, err = io.ReadFull(c.reader, signature[:])
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if !bytes.HasPrefix(signature[:], []byte(chunkSignatureHeader)) {
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
b, err := c.reader.ReadByte()
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if b != '\r' {
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
b, err = c.reader.ReadByte()
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if b != '\n' {
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
|
||||
if cap(c.buffer) < size {
|
||||
c.buffer = make([]byte, size)
|
||||
} else {
|
||||
c.buffer = c.buffer[:size]
|
||||
}
|
||||
|
||||
// Now, we read the payload and compute its SHA-256 hash.
|
||||
_, err = io.ReadFull(c.reader, c.buffer)
|
||||
if err == io.EOF && size != 0 {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
b, err = c.reader.ReadByte()
|
||||
if b != '\r' || err != nil {
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
b, err = c.reader.ReadByte()
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if b != '\n' {
|
||||
c.err = errMalformedChunkedEncoding
|
||||
return num, c.err
|
||||
}
|
||||
|
||||
// Once we have read the entire chunk successfully, we verify
|
||||
// that the received signature matches our computed signature.
|
||||
|
||||
calculatedSignature, err := c.streamSigner.GetSignature(nil, c.buffer, c.requestTime)
|
||||
if err != nil {
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why don't we check the Why don't we check the `err` here?
|
||||
c.err = err
|
||||
return num, c.err
|
||||
}
|
||||
if string(signature[16:]) != hex.EncodeToString(calculatedSignature) {
|
||||
c.err = errs.GetAPIError(errs.ErrSignatureDoesNotMatch)
|
||||
return num, c.err
|
||||
}
|
||||
|
||||
// If the chunk size is zero we return io.EOF. As specified by AWS,
|
||||
// only the last chunk is zero-sized.
|
||||
if size == 0 {
|
||||
c.err = io.EOF
|
||||
return num, c.err
|
||||
}
|
||||
|
||||
c.offset = copy(buf, c.buffer)
|
||||
num += c.offset
|
||||
return num, err
|
||||
}
|
||||
|
||||
func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser, error) {
|
||||
// Expecting to refactor this in future:
|
||||
// https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/issues/137
|
||||
box, ok := req.Context().Value(api.BoxData).(*accessbox.Box)
|
||||
if !ok {
|
||||
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
||||
}
|
||||
|
||||
authHeaders, ok := req.Context().Value(api.AuthHeaders).(*auth.AuthHeader)
|
||||
if !ok {
|
||||
return nil, errs.GetAPIError(errs.ErrAuthorizationHeaderMalformed)
|
||||
}
|
||||
|
||||
currentCredentials := credentials.NewStaticCredentials(authHeaders.AccessKeyID, box.Gate.AccessKey, "")
|
||||
seed, err := hex.DecodeString(authHeaders.SignatureV4)
|
||||
if err != nil {
|
||||
return nil, errs.GetAPIError(errs.ErrSignatureDoesNotMatch)
|
||||
}
|
||||
|
||||
reqTime, ok := req.Context().Value(api.ClientTime).(time.Time)
|
||||
if !ok {
|
||||
return nil, errs.GetAPIError(errs.ErrMalformedDate)
|
||||
}
|
||||
newStreamSigner := v4.NewStreamSigner(authHeaders.Region, "s3", seed, currentCredentials)
|
||||
|
||||
return &s3ChunkReader{
|
||||
reader: bufio.NewReader(req.Body),
|
||||
streamSigner: newStreamSigner,
|
||||
requestTime: reqTime,
|
||||
buffer: make([]byte, 64*1024),
|
||||
}, nil
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
package api
|
||||
|
||||
import "net/http"
|
||||
|
||||
// Standard S3 HTTP request/response constants.
|
||||
const (
|
||||
MetadataPrefix = "X-Amz-Meta-"
|
||||
|
@ -39,11 +41,13 @@ const (
|
|||
IfMatch = "If-Match"
|
||||
IfNoneMatch = "If-None-Match"
|
||||
|
||||
AmzContentSha256 = "X-Amz-Content-Sha256"
|
||||
AmzCopyIfModifiedSince = "X-Amz-Copy-Source-If-Modified-Since"
|
||||
AmzCopyIfUnmodifiedSince = "X-Amz-Copy-Source-If-Unmodified-Since"
|
||||
AmzCopyIfMatch = "X-Amz-Copy-Source-If-Match"
|
||||
AmzCopyIfNoneMatch = "X-Amz-Copy-Source-If-None-Match"
|
||||
AmzACL = "X-Amz-Acl"
|
||||
AmzDecodedContentLength = "X-Amz-Decoded-Content-Length"
|
||||
AmzGrantFullControl = "X-Amz-Grant-Full-Control"
|
||||
AmzGrantRead = "X-Amz-Grant-Read"
|
||||
AmzGrantWrite = "X-Amz-Grant-Write"
|
||||
|
@ -78,9 +82,13 @@ const (
|
|||
AccessControlRequestMethod = "Access-Control-Request-Method"
|
||||
AccessControlRequestHeaders = "Access-Control-Request-Headers"
|
||||
|
||||
AwsChunked = "aws-chunked"
|
||||
|
||||
Vary = "Vary"
|
||||
|
||||
DefaultLocationConstraint = "default"
|
||||
|
||||
StreamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
|
||||
)
|
||||
|
||||
// S3 request query params.
|
||||
|
@ -107,3 +115,12 @@ var SystemMetadata = map[string]struct{}{
|
|||
LastModified: {},
|
||||
ETag: {},
|
||||
}
|
||||
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Shouldn't we also check the Shouldn't we also check the `content-encoding` for containing `aws-chunked`?
|
||||
func IsSignedStreamingV4(r *http.Request) bool {
|
||||
// The Content-Encoding must have "aws-chunked" as part of its value.
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
// Minio does not set this value, thus for compatibility reasons
|
||||
// we do not check it.
|
||||
return r.Header.Get(AmzContentSha256) == StreamingContentSHA256 &&
|
||||
r.Method == http.MethodPut
|
||||
}
|
||||
|
|
|
@ -13,6 +13,9 @@ import (
|
|||
// KeyWrapper is wrapper for context keys.
|
||||
type KeyWrapper string
|
||||
|
||||
// AuthHeaders is a wrapper for authentication headers of a request.
|
||||
var AuthHeaders = KeyWrapper("__context_auth_headers_key")
|
||||
|
||||
// BoxData is an ID used to store accessbox.Box in a context.
|
||||
var BoxData = KeyWrapper("__context_box_key")
|
||||
|
||||
|
@ -42,6 +45,7 @@ func AuthMiddleware(log *zap.Logger, center auth.Center) mux.MiddlewareFunc {
|
|||
if !box.ClientTime.IsZero() {
|
||||
alexvanin marked this conversation as resolved
alexvanin
commented
Client time is required to work with chunk payloads. However Client time is required to work with chunk payloads. However `center.Authenticate` might not set it depending on `needClientTime` flag which is set when `X-Amz-Algorithm != AWS4-HMAC-SHA256`. So my question to @dkirillov, would it be a bit more error prone if we set client time for all requests as long as it is available in `X-Amz-Date`?
dkirillov
commented
Actually, we don't use client time when we get request with presigned url. But I wouldn't say that it be more error prone if we set client (signature) time for all request with And we can use s3-gw time for some specific cases > which is set when `X-Amz-Algorithm != AWS4-HMAC-SHA256`
Actually, we don't use client time when we get request with presigned url. But I wouldn't say that it be more error prone if we set client (signature) time for all request with `X-Amz-Date` (probably we should use just `Date` if `X-Amz-Date` [is missed](https://docs.aws.amazon.com/AmazonECR/latest/APIReference/CommonParameters.html))
And we can use s3-gw time for some [specific cases](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/8fcaf76f419fc9a3cf6d1b9e3004412093a3d415/api/handler/put.go#L531)
|
||||
ctx = context.WithValue(ctx, ClientTime, box.ClientTime)
|
||||
}
|
||||
ctx = context.WithValue(ctx, AuthHeaders, box.AuthHeaders)
|
||||
}
|
||||
|
||||
h.ServeHTTP(w, r.WithContext(ctx))
|
||||
|
|
I think you missed
return
here too, as it is done inapi/handler/put.go