forked from TrueCloudLab/frostfs-s3-gw
[#205] Add md5 checksum in header
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
8d6aa0d40a
commit
25bb581fee
26 changed files with 254 additions and 39 deletions
|
@ -46,6 +46,7 @@ type (
|
||||||
Created time.Time
|
Created time.Time
|
||||||
CreationEpoch uint64
|
CreationEpoch uint64
|
||||||
HashSum string
|
HashSum string
|
||||||
|
MD5Sum string
|
||||||
Owner user.ID
|
Owner user.ID
|
||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
}
|
}
|
||||||
|
@ -116,6 +117,13 @@ func (o *ObjectInfo) Address() oid.Address {
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *ObjectInfo) ETag(md5Enabled bool) string {
|
||||||
|
if md5Enabled && len(o.MD5Sum) > 0 {
|
||||||
|
return o.MD5Sum
|
||||||
|
}
|
||||||
|
return o.HashSum
|
||||||
|
}
|
||||||
|
|
||||||
func (b BucketSettings) Unversioned() bool {
|
func (b BucketSettings) Unversioned() bool {
|
||||||
return b.Versioning == VersioningUnversioned
|
return b.Versioning == VersioningUnversioned
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ type BaseNodeVersion struct {
|
||||||
Timestamp uint64
|
Timestamp uint64
|
||||||
Size uint64
|
Size uint64
|
||||||
ETag string
|
ETag string
|
||||||
|
MD5 string
|
||||||
FilePath string
|
FilePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,6 +87,7 @@ type PartInfo struct {
|
||||||
OID oid.ID `json:"oid"`
|
OID oid.ID `json:"oid"`
|
||||||
Size uint64 `json:"size"`
|
Size uint64 `json:"size"`
|
||||||
ETag string `json:"etag"`
|
ETag string `json:"etag"`
|
||||||
|
MD5 string `json:"md5"`
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ type (
|
||||||
IsResolveListAllow() bool
|
IsResolveListAllow() bool
|
||||||
CompleteMultipartKeepalive() time.Duration
|
CompleteMultipartKeepalive() time.Duration
|
||||||
BypassContentEncodingInChunks() bool
|
BypassContentEncodingInChunks() bool
|
||||||
|
MD5Enabled() bool
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,8 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
|
||||||
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
|
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
|
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int,
|
||||||
|
isBucketUnversioned, md5Enabled bool) {
|
||||||
info := extendedInfo.ObjectInfo
|
info := extendedInfo.ObjectInfo
|
||||||
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
|
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
|
||||||
h.Set(api.ContentType, info.ContentType)
|
h.Set(api.ContentType, info.ContentType)
|
||||||
|
@ -94,7 +95,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
|
||||||
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
|
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Set(api.ETag, info.HashSum)
|
h.Set(api.ETag, info.ETag(md5Enabled))
|
||||||
|
|
||||||
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
|
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
|
||||||
|
|
||||||
if !isBucketUnversioned {
|
if !isBucketUnversioned {
|
||||||
|
@ -222,7 +224,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
|
||||||
if params != nil {
|
if params != nil {
|
||||||
writeRangeHeaders(w, params, fullSize)
|
writeRangeHeaders(w, params, fullSize)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -197,6 +197,19 @@ func TestGetObject(t *testing.T) {
|
||||||
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
|
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetObjectEnabledMD5(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktName, objName := "bucket", "obj"
|
||||||
|
_, objInfo := createBucketAndObject(hc, bktName, objName)
|
||||||
|
|
||||||
|
_, headers := getObject(hc, bktName, objName)
|
||||||
|
require.Equal(t, objInfo.HashSum, headers.Get(api.ETag))
|
||||||
|
|
||||||
|
hc.config.md5Enabled = true
|
||||||
|
_, headers = getObject(hc, bktName, objName)
|
||||||
|
require.Equal(t, objInfo.MD5Sum, headers.Get(api.ETag))
|
||||||
|
}
|
||||||
|
|
||||||
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
||||||
body := bytes.NewReader([]byte(content))
|
body := bytes.NewReader([]byte(content))
|
||||||
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
||||||
|
|
|
@ -63,6 +63,7 @@ type configMock struct {
|
||||||
copiesNumbers map[string][]uint32
|
copiesNumbers map[string][]uint32
|
||||||
defaultCopiesNumbers []uint32
|
defaultCopiesNumbers []uint32
|
||||||
bypassContentEncodingInChunks bool
|
bypassContentEncodingInChunks bool
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||||
|
@ -110,6 +111,10 @@ func (c *configMock) CompleteMultipartKeepalive() time.Duration {
|
||||||
return time.Duration(0)
|
return time.Duration(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configMock) MD5Enabled() bool {
|
||||||
|
return c.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
return prepareHandlerContextBase(t, false)
|
return prepareHandlerContextBase(t, false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -246,6 +246,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
Size: size,
|
Size: size,
|
||||||
Reader: body,
|
Reader: body,
|
||||||
|
ContentMD5: r.Header.Get(api.ContentMD5),
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Info.Encryption, err = formEncryptionParams(r)
|
p.Info.Encryption, err = formEncryptionParams(r)
|
||||||
|
@ -376,8 +377,8 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
response := UploadPartCopyResponse{
|
response := UploadPartCopyResponse{
|
||||||
ETag: info.HashSum,
|
|
||||||
LastModified: info.Created.UTC().Format(time.RFC3339),
|
LastModified: info.Created.UTC().Format(time.RFC3339),
|
||||||
|
ETag: info.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Info.Encryption.Enabled() {
|
if p.Info.Encryption.Enabled() {
|
||||||
|
@ -452,8 +453,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
|
|
||||||
response := CompleteMultipartUploadResponse{
|
response := CompleteMultipartUploadResponse{
|
||||||
Bucket: objInfo.Bucket,
|
Bucket: objInfo.Bucket,
|
||||||
ETag: objInfo.HashSum,
|
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
|
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Here we previously set api.AmzVersionID header for versioned bucket.
|
// Here we previously set api.AmzVersionID header for versioned bucket.
|
||||||
|
|
|
@ -2,6 +2,8 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -276,6 +278,33 @@ func TestMultipartUploadWithContentLanguage(t *testing.T) {
|
||||||
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
|
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartUploadEnabledMD5(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
hc.config.md5Enabled = true
|
||||||
|
hc.layerFeatures.SetMD5Enabled(true)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-md5", "object-md5"
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
partSize := 5 * 1024 * 1024
|
||||||
|
multipartUpload := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
etag1, partBody1 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 1, partSize)
|
||||||
|
md5Sum1 := md5.Sum(partBody1)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Sum1[:]), etag1)
|
||||||
|
|
||||||
|
etag2, partBody2 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 2, partSize)
|
||||||
|
md5Sum2 := md5.Sum(partBody2)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Sum2[:]), etag2)
|
||||||
|
|
||||||
|
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
|
||||||
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
resp := &CompleteMultipartUploadResponse{}
|
||||||
|
err := xml.NewDecoder(w.Result().Body).Decode(resp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
completeMD5Sum := md5.Sum(append(md5Sum1[:], md5Sum2[:]...))
|
||||||
|
require.Equal(t, hex.EncodeToString(completeMD5Sum[:])+"-2", resp.ETag)
|
||||||
|
}
|
||||||
|
|
||||||
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||||
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,7 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name)
|
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name, h.cfg.MD5Enabled())
|
||||||
if err = middleware.EncodeToResponse(w, response); err != nil {
|
if err = middleware.EncodeToResponse(w, response); err != nil {
|
||||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||||
}
|
}
|
||||||
|
@ -261,7 +261,7 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string) *ListObjectsVersionsResponse {
|
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
|
||||||
res := ListObjectsVersionsResponse{
|
res := ListObjectsVersionsResponse{
|
||||||
Name: bucketName,
|
Name: bucketName,
|
||||||
IsTruncated: info.IsTruncated,
|
IsTruncated: info.IsTruncated,
|
||||||
|
@ -286,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
|
||||||
},
|
},
|
||||||
Size: ver.ObjectInfo.Size,
|
Size: ver.ObjectInfo.Size,
|
||||||
VersionID: ver.Version(),
|
VersionID: ver.Version(),
|
||||||
ETag: ver.ObjectInfo.HashSum,
|
ETag: ver.ObjectInfo.ETag(md5Enabled),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// this loop is not starting till versioning is not implemented
|
// this loop is not starting till versioning is not implemented
|
||||||
|
|
|
@ -245,6 +245,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
Size: size,
|
Size: size,
|
||||||
Header: metadata,
|
Header: metadata,
|
||||||
Encryption: encryptionParams,
|
Encryption: encryptionParams,
|
||||||
|
ContentMD5: r.Header.Get(api.ContentMD5),
|
||||||
}
|
}
|
||||||
|
|
||||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
||||||
|
@ -327,7 +328,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
addSSECHeaders(w.Header(), r.Header)
|
addSSECHeaders(w.Header(), r.Header)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.ETag, objInfo.HashSum)
|
w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enabled()))
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
middleware.WriteSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -562,7 +564,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
||||||
resp := &PostResponse{
|
resp := &PostResponse{
|
||||||
Bucket: objInfo.Bucket,
|
Bucket: objInfo.Bucket,
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
ETag: objInfo.HashSum,
|
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
w.WriteHeader(status)
|
w.WriteHeader(status)
|
||||||
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {
|
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {
|
||||||
|
|
|
@ -3,7 +3,10 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
@ -194,6 +197,37 @@ func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
|
||||||
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithInvalidContentMD5(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-put", "object-for-put"
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
content := []byte("content")
|
||||||
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||||
|
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString([]byte("invalid")))
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidDigest))
|
||||||
|
|
||||||
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithEnabledMD5(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-put", "object-for-put"
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
content := []byte("content")
|
||||||
|
md5Hash := md5.New()
|
||||||
|
md5Hash.Write(content)
|
||||||
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Hash.Sum(nil)), w.Header().Get(api.ETag))
|
||||||
|
}
|
||||||
|
|
||||||
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
CopiesNumber: p.CopiesNumbers,
|
CopiesNumber: p.CopiesNumbers,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("put system object: %w", err)
|
return fmt.Errorf("put system object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type FeatureSettingsMock struct {
|
type FeatureSettingsMock struct {
|
||||||
clientCut bool
|
clientCut bool
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
||||||
|
@ -41,6 +42,14 @@ func (k *FeatureSettingsMock) SetClientCut(clientCut bool) {
|
||||||
k.clientCut = clientCut
|
k.clientCut = clientCut
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) MD5Enabled() bool {
|
||||||
|
return k.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
|
||||||
|
k.md5Enabled = md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
type TestFrostFS struct {
|
type TestFrostFS struct {
|
||||||
FrostFS
|
FrostFS
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ type (
|
||||||
FeatureSettings interface {
|
FeatureSettings interface {
|
||||||
ClientCut() bool
|
ClientCut() bool
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
|
MD5Enabled() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
layer struct {
|
layer struct {
|
||||||
|
@ -111,14 +112,16 @@ type (
|
||||||
|
|
||||||
// PutObjectParams stores object put request parameters.
|
// PutObjectParams stores object put request parameters.
|
||||||
PutObjectParams struct {
|
PutObjectParams struct {
|
||||||
BktInfo *data.BucketInfo
|
BktInfo *data.BucketInfo
|
||||||
Object string
|
Object string
|
||||||
Size uint64
|
Size uint64
|
||||||
Reader io.Reader
|
Reader io.Reader
|
||||||
Header map[string]string
|
Header map[string]string
|
||||||
Lock *data.ObjectLock
|
Lock *data.ObjectLock
|
||||||
Encryption encryption.Params
|
Encryption encryption.Params
|
||||||
CopiesNumbers []uint32
|
CopiesNumbers []uint32
|
||||||
|
CompleteMD5Hash string
|
||||||
|
ContentMD5 string
|
||||||
}
|
}
|
||||||
|
|
||||||
PutCombinedObjectParams struct {
|
PutCombinedObjectParams struct {
|
||||||
|
|
|
@ -3,6 +3,8 @@ package layer
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -68,6 +70,7 @@ type (
|
||||||
PartNumber int
|
PartNumber int
|
||||||
Size uint64
|
Size uint64
|
||||||
Reader io.Reader
|
Reader io.Reader
|
||||||
|
ContentMD5 string
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadCopyParams struct {
|
UploadCopyParams struct {
|
||||||
|
@ -197,7 +200,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfo.HashSum, nil
|
return objInfo.ETag(n.features.MD5Enabled()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||||
|
@ -230,10 +233,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
||||||
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
||||||
|
|
||||||
size, id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(p.ContentMD5) > 0 {
|
||||||
|
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
|
||||||
|
prm := PrmObjectDelete{
|
||||||
|
Object: id,
|
||||||
|
Container: bktInfo.CID,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
|
err = n.frostFS.DeleteObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
}
|
||||||
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
}
|
||||||
if p.Info.Encryption.Enabled() {
|
if p.Info.Encryption.Enabled() {
|
||||||
size = decSize
|
size = decSize
|
||||||
}
|
}
|
||||||
|
@ -250,6 +271,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
Size: size,
|
Size: size,
|
||||||
ETag: hex.EncodeToString(hash),
|
ETag: hex.EncodeToString(hash),
|
||||||
Created: prm.CreationTime,
|
Created: prm.CreationTime,
|
||||||
|
MD5: hex.EncodeToString(md5Hash),
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||||
|
@ -274,6 +296,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
Size: partInfo.Size,
|
Size: partInfo.Size,
|
||||||
Created: partInfo.Created,
|
Created: partInfo.Created,
|
||||||
HashSum: partInfo.ETag,
|
HashSum: partInfo.ETag,
|
||||||
|
MD5Sum: partInfo.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfo, nil
|
return objInfo, nil
|
||||||
|
@ -347,9 +370,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
||||||
|
|
||||||
var completedPartsHeader strings.Builder
|
var completedPartsHeader strings.Builder
|
||||||
|
md5Hash := md5.New()
|
||||||
for i, part := range p.Parts {
|
for i, part := range p.Parts {
|
||||||
partInfo := partsInfo[part.PartNumber]
|
partInfo := partsInfo[part.PartNumber]
|
||||||
if partInfo == nil || part.ETag != partInfo.ETag {
|
if partInfo == nil || (part.ETag != partInfo.ETag && part.ETag != partInfo.MD5) {
|
||||||
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
||||||
}
|
}
|
||||||
delete(partsInfo, part.PartNumber)
|
delete(partsInfo, part.PartNumber)
|
||||||
|
@ -376,6 +400,12 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
|
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bytesHash, err := hex.DecodeString(partInfo.MD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
|
||||||
|
}
|
||||||
|
md5Hash.Write(bytesHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
|
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
|
||||||
|
@ -410,13 +440,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
}
|
}
|
||||||
|
|
||||||
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
|
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
|
||||||
BktInfo: p.Info.Bkt,
|
BktInfo: p.Info.Bkt,
|
||||||
Object: p.Info.Key,
|
Object: p.Info.Key,
|
||||||
Reader: bytes.NewReader(partsData),
|
Reader: bytes.NewReader(partsData),
|
||||||
Header: initMetadata,
|
Header: initMetadata,
|
||||||
Size: multipartObjetSize,
|
Size: multipartObjetSize,
|
||||||
Encryption: p.Info.Encryption,
|
Encryption: p.Info.Encryption,
|
||||||
CopiesNumbers: multipartInfo.CopiesNumbers,
|
CopiesNumbers: multipartInfo.CopiesNumbers,
|
||||||
|
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
|
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
||||||
CopiesNumber: p.CopiesNumbers,
|
CopiesNumber: p.CopiesNumbers,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -287,10 +290,23 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
}
|
}
|
||||||
|
|
||||||
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(p.ContentMD5) > 0 {
|
||||||
|
headerMd5Hash, err := base64.StdEncoding.DecodeString(p.ContentMD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(headerMd5Hash, md5Hash) {
|
||||||
|
err = n.objectDelete(ctx, p.BktInfo, id)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
}
|
||||||
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
|
||||||
|
@ -304,6 +320,11 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
IsUnversioned: !bktSettings.VersioningEnabled(),
|
IsUnversioned: !bktSettings.VersioningEnabled(),
|
||||||
IsCombined: p.Header[MultipartObjectSize] != "",
|
IsCombined: p.Header[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
if len(p.CompleteMD5Hash) > 0 {
|
||||||
|
newVersion.MD5 = p.CompleteMD5Hash
|
||||||
|
} else {
|
||||||
|
newVersion.MD5 = hex.EncodeToString(md5Hash)
|
||||||
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||||
|
@ -340,6 +361,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
Headers: p.Header,
|
Headers: p.Header,
|
||||||
ContentType: p.Header[api.ContentType],
|
ContentType: p.Header[api.ContentType],
|
||||||
HashSum: newVersion.ETag,
|
HashSum: newVersion.ETag,
|
||||||
|
MD5Sum: newVersion.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
extendedObjInfo := &data.ExtendedObjectInfo{
|
extendedObjInfo := &data.ExtendedObjectInfo{
|
||||||
|
@ -378,6 +400,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
|
objInfo.MD5Sum = node.MD5
|
||||||
|
|
||||||
extObjInfo := &data.ExtendedObjectInfo{
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
|
@ -430,6 +453,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
|
objInfo.MD5Sum = foundVersion.MD5
|
||||||
|
|
||||||
extObjInfo := &data.ExtendedObjectInfo{
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
|
@ -457,16 +481,18 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
|
||||||
|
|
||||||
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
||||||
// Returns object ID and payload sha256 hash.
|
// Returns object ID and payload sha256 hash.
|
||||||
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
|
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
|
||||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
prm.ClientCut = n.features.ClientCut()
|
prm.ClientCut = n.features.ClientCut()
|
||||||
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
||||||
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
|
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
|
||||||
var size uint64
|
var size uint64
|
||||||
hash := sha256.New()
|
hash := sha256.New()
|
||||||
|
md5Hash := md5.New()
|
||||||
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
|
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
|
||||||
size += uint64(len(buf))
|
size += uint64(len(buf))
|
||||||
hash.Write(buf)
|
hash.Write(buf)
|
||||||
|
md5Hash.Write(buf)
|
||||||
})
|
})
|
||||||
id, err := n.frostFS.CreateObject(ctx, prm)
|
id, err := n.frostFS.CreateObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -474,9 +500,9 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, oid.ID{}, nil, err
|
return 0, oid.ID{}, nil, nil, err
|
||||||
}
|
}
|
||||||
return size, id, hash.Sum(nil), nil
|
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
||||||
|
@ -807,6 +833,7 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
oi = objectInfoFromMeta(bktInfo, meta)
|
oi = objectInfoFromMeta(bktInfo, meta)
|
||||||
|
oi.MD5Sum = node.MD5
|
||||||
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
|
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
|
||||||
|
|
||||||
return oi
|
return oi
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
|
||||||
return oid.ID{}, err
|
return oid.ID{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ type (
|
||||||
bypassContentEncodingInChunks bool
|
bypassContentEncodingInChunks bool
|
||||||
clientCut bool
|
clientCut bool
|
||||||
maxBufferSizeForPut uint64
|
maxBufferSizeForPut uint64
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -191,6 +192,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings.setClientCut(v.GetBool(cfgClientCut))
|
settings.setClientCut(v.GetBool(cfgClientCut))
|
||||||
settings.initPlacementPolicy(log.logger, v)
|
settings.initPlacementPolicy(log.logger, v)
|
||||||
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
settings.setMD5Enabled(v.GetBool(cfgMD5Enabled))
|
||||||
|
|
||||||
return settings
|
return settings
|
||||||
}
|
}
|
||||||
|
@ -312,6 +314,18 @@ func (s *appSettings) CompleteMultipartKeepalive() time.Duration {
|
||||||
return s.completeMultipartKeepalive
|
return s.completeMultipartKeepalive
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) MD5Enabled() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setMD5Enabled(md5Enabled bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.md5Enabled = md5Enabled
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
@ -604,6 +618,7 @@ func (a *App) updateSettings() {
|
||||||
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
||||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
a.settings.setMD5Enabled(a.cfg.GetBool(cfgMD5Enabled))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) startServices() {
|
func (a *App) startServices() {
|
||||||
|
|
|
@ -162,6 +162,9 @@ const ( // Settings.
|
||||||
// Runtime.
|
// Runtime.
|
||||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||||
|
|
||||||
|
// Enable return MD5 checksum in ETag.
|
||||||
|
cfgMD5Enabled = "features.md5.enabled"
|
||||||
|
|
||||||
// envPrefix is an environment variables prefix used for configuration.
|
// envPrefix is an environment variables prefix used for configuration.
|
||||||
envPrefix = "S3_GW"
|
envPrefix = "S3_GW"
|
||||||
)
|
)
|
||||||
|
|
|
@ -150,3 +150,5 @@ S3_GW_TRACING_ENDPOINT="localhost:4318"
|
||||||
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
||||||
|
|
||||||
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||||
|
|
||||||
|
S3_GW_FEATURES_MD5_ENABLED=false
|
||||||
|
|
|
@ -176,3 +176,7 @@ kludge:
|
||||||
|
|
||||||
runtime:
|
runtime:
|
||||||
soft_memory_limit: 1gb
|
soft_memory_limit: 1gb
|
||||||
|
|
||||||
|
features:
|
||||||
|
md5:
|
||||||
|
enabled: false
|
||||||
|
|
|
@ -186,6 +186,7 @@ There are some custom types used for brevity:
|
||||||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||||
| `kludge` | [Different kludge configuration](#kludge-section) |
|
| `kludge` | [Different kludge configuration](#kludge-section) |
|
||||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||||
|
| `features` | [Features configuration](#features-section) |
|
||||||
|
|
||||||
### General section
|
### General section
|
||||||
|
|
||||||
|
@ -562,3 +563,16 @@ runtime:
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
||||||
|
|
||||||
|
# `features` section
|
||||||
|
Contains parameters for enabling features.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
features:
|
||||||
|
md5:
|
||||||
|
enabled: false
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|---------------|--------|---------------|---------------|----------------------------------------------------------------|
|
||||||
|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||||
|
|
|
@ -75,6 +75,7 @@ const (
|
||||||
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
|
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
|
||||||
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
|
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
|
||||||
PutObject = "put object" // Debug in ../../api/layer/object.go
|
PutObject = "put object" // Debug in ../../api/layer/object.go
|
||||||
|
FailedToDeleteObject = "failed to delete object" // Debug in ../../api/layer/object.go
|
||||||
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
|
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
|
||||||
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
||||||
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
||||||
|
|
|
@ -81,6 +81,7 @@ const (
|
||||||
partNumberKV = "Number"
|
partNumberKV = "Number"
|
||||||
sizeKV = "Size"
|
sizeKV = "Size"
|
||||||
etagKV = "ETag"
|
etagKV = "ETag"
|
||||||
|
md5KV = "MD5"
|
||||||
|
|
||||||
// keys for lock.
|
// keys for lock.
|
||||||
isLockKV = "IsLock"
|
isLockKV = "IsLock"
|
||||||
|
@ -185,6 +186,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
||||||
_, isCombined := treeNode.Get(isCombinedKV)
|
_, isCombined := treeNode.Get(isCombinedKV)
|
||||||
eTag, _ := treeNode.Get(etagKV)
|
eTag, _ := treeNode.Get(etagKV)
|
||||||
|
md5, _ := treeNode.Get(md5KV)
|
||||||
|
|
||||||
version := &data.NodeVersion{
|
version := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
@ -193,6 +195,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
OID: treeNode.ObjID,
|
OID: treeNode.ObjID,
|
||||||
Timestamp: treeNode.TimeStamp,
|
Timestamp: treeNode.TimeStamp,
|
||||||
ETag: eTag,
|
ETag: eTag,
|
||||||
|
MD5: md5,
|
||||||
Size: treeNode.Size,
|
Size: treeNode.Size,
|
||||||
FilePath: filePath,
|
FilePath: filePath,
|
||||||
},
|
},
|
||||||
|
@ -302,6 +305,8 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
||||||
return nil, fmt.Errorf("invalid created timestamp: %w", err)
|
return nil, fmt.Errorf("invalid created timestamp: %w", err)
|
||||||
}
|
}
|
||||||
partInfo.Created = time.UnixMilli(utcMilli)
|
partInfo.Created = time.UnixMilli(utcMilli)
|
||||||
|
case md5KV:
|
||||||
|
partInfo.MD5 = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,7 +583,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
||||||
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
|
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
||||||
path := pathFromName(objectName)
|
path := pathFromName(objectName)
|
||||||
|
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
|
@ -1024,6 +1029,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
sizeKV: strconv.FormatUint(info.Size, 10),
|
sizeKV: strconv.FormatUint(info.Size, 10),
|
||||||
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
|
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
|
||||||
etagKV: info.ETag,
|
etagKV: info.ETag,
|
||||||
|
md5KV: info.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
|
@ -1156,6 +1162,9 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
if len(version.ETag) > 0 {
|
if len(version.ETag) > 0 {
|
||||||
meta[etagKV] = version.ETag
|
meta[etagKV] = version.ETag
|
||||||
}
|
}
|
||||||
|
if len(version.MD5) > 0 {
|
||||||
|
meta[md5KV] = version.MD5
|
||||||
|
}
|
||||||
|
|
||||||
if version.IsDeleteMarker() {
|
if version.IsDeleteMarker() {
|
||||||
meta[isDeleteMarkerKV] = "true"
|
meta[isDeleteMarkerKV] = "true"
|
||||||
|
@ -1200,7 +1209,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
||||||
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
|
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
||||||
path := pathFromName(filepath)
|
path := pathFromName(filepath)
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
|
|
Loading…
Reference in a new issue