[#205] Add md5 checksum in header #228
|
@ -46,6 +46,7 @@ type (
|
||||||
Created time.Time
|
Created time.Time
|
||||||
CreationEpoch uint64
|
CreationEpoch uint64
|
||||||
HashSum string
|
HashSum string
|
||||||
|
MD5Sum string
|
||||||
Owner user.ID
|
Owner user.ID
|
||||||
dkirillov marked this conversation as resolved
Outdated
|
|||||||
Headers map[string]string
|
Headers map[string]string
|
||||||
}
|
}
|
||||||
|
@ -116,6 +117,13 @@ func (o *ObjectInfo) Address() oid.Address {
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *ObjectInfo) ETag(md5Enabled bool) string {
|
||||||
|
if md5Enabled && len(o.MD5Sum) > 0 {
|
||||||
|
return o.MD5Sum
|
||||||
|
}
|
||||||
|
return o.HashSum
|
||||||
|
}
|
||||||
|
|
||||||
func (b BucketSettings) Unversioned() bool {
|
func (b BucketSettings) Unversioned() bool {
|
||||||
return b.Versioning == VersioningUnversioned
|
return b.Versioning == VersioningUnversioned
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ type BaseNodeVersion struct {
|
||||||
Timestamp uint64
|
Timestamp uint64
|
||||||
Size uint64
|
Size uint64
|
||||||
ETag string
|
ETag string
|
||||||
|
MD5 string
|
||||||
FilePath string
|
FilePath string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,6 +87,7 @@ type PartInfo struct {
|
||||||
OID oid.ID `json:"oid"`
|
OID oid.ID `json:"oid"`
|
||||||
Size uint64 `json:"size"`
|
Size uint64 `json:"size"`
|
||||||
ETag string `json:"etag"`
|
ETag string `json:"etag"`
|
||||||
|
MD5 string `json:"md5"`
|
||||||
Created time.Time `json:"created"`
|
Created time.Time `json:"created"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ type (
|
||||||
IsResolveListAllow() bool
|
IsResolveListAllow() bool
|
||||||
CompleteMultipartKeepalive() time.Duration
|
CompleteMultipartKeepalive() time.Duration
|
||||||
BypassContentEncodingInChunks() bool
|
BypassContentEncodingInChunks() bool
|
||||||
|
MD5Enabled() bool
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,8 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
|
||||||
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
|
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
|
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int,
|
||||||
|
isBucketUnversioned, md5Enabled bool) {
|
||||||
info := extendedInfo.ObjectInfo
|
info := extendedInfo.ObjectInfo
|
||||||
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
|
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
|
||||||
h.Set(api.ContentType, info.ContentType)
|
h.Set(api.ContentType, info.ContentType)
|
||||||
|
@ -94,7 +95,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
|
||||||
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
|
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
h.Set(api.ETag, info.HashSum)
|
h.Set(api.ETag, info.ETag(md5Enabled))
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
If If `info.MD5Sum` contains base64 encoded hash can we rename it to `MD5SumBase64`? But I would like to store hex encoded values and encode/decode where it's need
mbiryukova
commented
`info.MD5Sum` is stored as hex encoded value
dkirillov
commented
Then we have to encode it to base64 here (and probably without quotes) Then we have to encode it to base64 here (and probably without quotes)
mbiryukova
commented
I fixed this I fixed this
|
|||||||
|
|
||||||
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
|
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
|
||||||
|
|
||||||
if !isBucketUnversioned {
|
if !isBucketUnversioned {
|
||||||
|
@ -222,7 +224,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
|
||||||
if params != nil {
|
if params != nil {
|
||||||
writeRangeHeaders(w, params, fullSize)
|
writeRangeHeaders(w, params, fullSize)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -197,6 +197,19 @@ func TestGetObject(t *testing.T) {
|
||||||
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
|
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetObjectEnabledMD5(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktName, objName := "bucket", "obj"
|
||||||
|
_, objInfo := createBucketAndObject(hc, bktName, objName)
|
||||||
|
|
||||||
|
_, headers := getObject(hc, bktName, objName)
|
||||||
|
require.Equal(t, objInfo.HashSum, headers.Get(api.ETag))
|
||||||
|
|
||||||
|
hc.config.md5Enabled = true
|
||||||
|
_, headers = getObject(hc, bktName, objName)
|
||||||
|
require.Equal(t, objInfo.MD5Sum, headers.Get(api.ETag))
|
||||||
|
}
|
||||||
|
|
||||||
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
||||||
body := bytes.NewReader([]byte(content))
|
body := bytes.NewReader([]byte(content))
|
||||||
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
||||||
|
|
|
@ -63,6 +63,7 @@ type configMock struct {
|
||||||
copiesNumbers map[string][]uint32
|
copiesNumbers map[string][]uint32
|
||||||
defaultCopiesNumbers []uint32
|
defaultCopiesNumbers []uint32
|
||||||
bypassContentEncodingInChunks bool
|
bypassContentEncodingInChunks bool
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||||
|
@ -110,6 +111,10 @@ func (c *configMock) CompleteMultipartKeepalive() time.Duration {
|
||||||
return time.Duration(0)
|
return time.Duration(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configMock) MD5Enabled() bool {
|
||||||
|
return c.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||||
return prepareHandlerContextBase(t, false)
|
return prepareHandlerContextBase(t, false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -246,6 +246,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
Size: size,
|
Size: size,
|
||||||
Reader: body,
|
Reader: body,
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems we can simplify this and write:
It seems we can simplify this and write:
```golang
p := &layer.UploadPartParams{
// ...
MD5: r.Header.Get(api.ContentMD5)
}
```
|
|||||||
|
ContentMD5: r.Header.Get(api.ContentMD5),
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Info.Encryption, err = formEncryptionParams(r)
|
p.Info.Encryption, err = formEncryptionParams(r)
|
||||||
|
@ -376,8 +377,8 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
response := UploadPartCopyResponse{
|
response := UploadPartCopyResponse{
|
||||||
ETag: info.HashSum,
|
|
||||||
LastModified: info.Created.UTC().Format(time.RFC3339),
|
LastModified: info.Created.UTC().Format(time.RFC3339),
|
||||||
|
ETag: info.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Info.Encryption.Enabled() {
|
if p.Info.Encryption.Enabled() {
|
||||||
|
@ -452,8 +453,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
||||||
|
|
||||||
response := CompleteMultipartUploadResponse{
|
response := CompleteMultipartUploadResponse{
|
||||||
Bucket: objInfo.Bucket,
|
Bucket: objInfo.Bucket,
|
||||||
ETag: objInfo.HashSum,
|
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
|
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Here we previously set api.AmzVersionID header for versioned bucket.
|
// Here we previously set api.AmzVersionID header for versioned bucket.
|
||||||
|
|
|
@ -2,6 +2,8 @@ package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -276,6 +278,33 @@ func TestMultipartUploadWithContentLanguage(t *testing.T) {
|
||||||
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
|
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipartUploadEnabledMD5(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
hc.config.md5Enabled = true
|
||||||
|
hc.layerFeatures.SetMD5Enabled(true)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-md5", "object-md5"
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
partSize := 5 * 1024 * 1024
|
||||||
|
multipartUpload := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||||
|
etag1, partBody1 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 1, partSize)
|
||||||
|
md5Sum1 := md5.Sum(partBody1)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Sum1[:]), etag1)
|
||||||
|
|
||||||
|
etag2, partBody2 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 2, partSize)
|
||||||
|
md5Sum2 := md5.Sum(partBody2)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Sum2[:]), etag2)
|
||||||
|
|
||||||
|
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
|
||||||
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
resp := &CompleteMultipartUploadResponse{}
|
||||||
|
err := xml.NewDecoder(w.Result().Body).Decode(resp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
completeMD5Sum := md5.Sum(append(md5Sum1[:], md5Sum2[:]...))
|
||||||
|
require.Equal(t, hex.EncodeToString(completeMD5Sum[:])+"-2", resp.ETag)
|
||||||
|
}
|
||||||
|
|
||||||
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||||
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||||
}
|
}
|
||||||
|
|
|
@ -233,7 +233,7 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name)
|
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name, h.cfg.MD5Enabled())
|
||||||
if err = middleware.EncodeToResponse(w, response); err != nil {
|
if err = middleware.EncodeToResponse(w, response); err != nil {
|
||||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||||
}
|
}
|
||||||
|
@ -261,7 +261,7 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string) *ListObjectsVersionsResponse {
|
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
|
||||||
res := ListObjectsVersionsResponse{
|
res := ListObjectsVersionsResponse{
|
||||||
Name: bucketName,
|
Name: bucketName,
|
||||||
IsTruncated: info.IsTruncated,
|
IsTruncated: info.IsTruncated,
|
||||||
|
@ -286,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
|
||||||
},
|
},
|
||||||
Size: ver.ObjectInfo.Size,
|
Size: ver.ObjectInfo.Size,
|
||||||
VersionID: ver.Version(),
|
VersionID: ver.Version(),
|
||||||
ETag: ver.ObjectInfo.HashSum,
|
ETag: ver.ObjectInfo.ETag(md5Enabled),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// this loop is not starting till versioning is not implemented
|
// this loop is not starting till versioning is not implemented
|
||||||
|
|
|
@ -245,6 +245,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
Size: size,
|
Size: size,
|
||||||
Header: metadata,
|
Header: metadata,
|
||||||
Encryption: encryptionParams,
|
Encryption: encryptionParams,
|
||||||
|
ContentMD5: r.Header.Get(api.ContentMD5),
|
||||||
}
|
}
|
||||||
|
|
||||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
This can be simplified to
or even
This can be simplified to
```
params.ContentMD5 = r.Header.Get(api.ContentMD5)
```
or even
```
params := &layer.PutObjectParams{
// ...
ContentMD5: r.Header.Get(api.ContentMD5),
}
```
|
|||||||
|
@ -327,7 +328,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
addSSECHeaders(w.Header(), r.Header)
|
addSSECHeaders(w.Header(), r.Header)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.ETag, objInfo.HashSum)
|
w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enabled()))
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
middleware.WriteSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -562,7 +564,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
||||||
resp := &PostResponse{
|
resp := &PostResponse{
|
||||||
Bucket: objInfo.Bucket,
|
Bucket: objInfo.Bucket,
|
||||||
Key: objInfo.Name,
|
Key: objInfo.Name,
|
||||||
ETag: objInfo.HashSum,
|
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
|
||||||
}
|
}
|
||||||
w.WriteHeader(status)
|
w.WriteHeader(status)
|
||||||
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {
|
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {
|
||||||
|
|
|
@ -3,7 +3,10 @@ package handler
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
@ -194,6 +197,37 @@ func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
|
||||||
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithInvalidContentMD5(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-put", "object-for-put"
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
content := []byte("content")
|
||||||
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||||
|
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString([]byte("invalid")))
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidDigest))
|
||||||
|
|
||||||
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPutObjectWithEnabledMD5(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
tc.config.md5Enabled = true
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-put", "object-for-put"
|
||||||
|
createTestBucket(tc, bktName)
|
||||||
|
|
||||||
|
content := []byte("content")
|
||||||
|
md5Hash := md5.New()
|
||||||
|
md5Hash.Write(content)
|
||||||
|
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
require.Equal(t, hex.EncodeToString(md5Hash.Sum(nil)), w.Header().Get(api.ETag))
|
||||||
|
}
|
||||||
|
|
||||||
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
||||||
hc := prepareHandlerContext(t)
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
CopiesNumber: p.CopiesNumbers,
|
CopiesNumber: p.CopiesNumbers,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("put system object: %w", err)
|
return fmt.Errorf("put system object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
|
|
||||||
type FeatureSettingsMock struct {
|
type FeatureSettingsMock struct {
|
||||||
clientCut bool
|
clientCut bool
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
||||||
|
@ -41,6 +42,14 @@ func (k *FeatureSettingsMock) SetClientCut(clientCut bool) {
|
||||||
k.clientCut = clientCut
|
k.clientCut = clientCut
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) MD5Enabled() bool {
|
||||||
|
return k.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
|
||||||
|
k.md5Enabled = md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
type TestFrostFS struct {
|
type TestFrostFS struct {
|
||||||
FrostFS
|
FrostFS
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ type (
|
||||||
FeatureSettings interface {
|
FeatureSettings interface {
|
||||||
ClientCut() bool
|
ClientCut() bool
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
|
MD5Enabled() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
layer struct {
|
layer struct {
|
||||||
|
@ -119,6 +120,8 @@ type (
|
||||||
Lock *data.ObjectLock
|
Lock *data.ObjectLock
|
||||||
Encryption encryption.Params
|
Encryption encryption.Params
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
I would rename this to something like I would rename this to something like `NeedComputeMD5`
|
|||||||
CopiesNumbers []uint32
|
CopiesNumbers []uint32
|
||||||
|
CompleteMD5Hash string
|
||||||
|
ContentMD5 string
|
||||||
}
|
}
|
||||||
|
|
||||||
PutCombinedObjectParams struct {
|
PutCombinedObjectParams struct {
|
||||||
|
|
|
@ -3,6 +3,8 @@ package layer
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -68,6 +70,7 @@ type (
|
||||||
PartNumber int
|
PartNumber int
|
||||||
Size uint64
|
Size uint64
|
||||||
Reader io.Reader
|
Reader io.Reader
|
||||||
|
ContentMD5 string
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadCopyParams struct {
|
UploadCopyParams struct {
|
||||||
|
@ -197,7 +200,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfo.HashSum, nil
|
return objInfo.ETag(n.features.MD5Enabled()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||||
|
@ -230,10 +233,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
|
||||||
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
|
||||||
|
|
||||||
size, id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(p.ContentMD5) > 0 {
|
||||||
|
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
|
||||||
|
prm := PrmObjectDelete{
|
||||||
|
Object: id,
|
||||||
|
Container: bktInfo.CID,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
|
err = n.frostFS.DeleteObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
}
|
||||||
|
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
}
|
||||||
if p.Info.Encryption.Enabled() {
|
if p.Info.Encryption.Enabled() {
|
||||||
size = decSize
|
size = decSize
|
||||||
}
|
}
|
||||||
|
@ -250,6 +271,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
Size: size,
|
Size: size,
|
||||||
ETag: hex.EncodeToString(hash),
|
ETag: hex.EncodeToString(hash),
|
||||||
Created: prm.CreationTime,
|
Created: prm.CreationTime,
|
||||||
|
MD5: hex.EncodeToString(md5Hash),
|
||||||
}
|
}
|
||||||
|
|
||||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||||
|
@ -274,6 +296,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
Size: partInfo.Size,
|
Size: partInfo.Size,
|
||||||
Created: partInfo.Created,
|
Created: partInfo.Created,
|
||||||
HashSum: partInfo.ETag,
|
HashSum: partInfo.ETag,
|
||||||
|
MD5Sum: partInfo.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfo, nil
|
return objInfo, nil
|
||||||
|
@ -347,9 +370,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
||||||
|
|
||||||
var completedPartsHeader strings.Builder
|
var completedPartsHeader strings.Builder
|
||||||
|
md5Hash := md5.New()
|
||||||
for i, part := range p.Parts {
|
for i, part := range p.Parts {
|
||||||
partInfo := partsInfo[part.PartNumber]
|
partInfo := partsInfo[part.PartNumber]
|
||||||
if partInfo == nil || part.ETag != partInfo.ETag {
|
if partInfo == nil || (part.ETag != partInfo.ETag && part.ETag != partInfo.MD5) {
|
||||||
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
||||||
}
|
}
|
||||||
delete(partsInfo, part.PartNumber)
|
delete(partsInfo, part.PartNumber)
|
||||||
|
@ -376,6 +400,12 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
|
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bytesHash, err := hex.DecodeString(partInfo.MD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
|
||||||
|
}
|
||||||
|
md5Hash.Write(bytesHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
|
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
|
||||||
|
@ -417,6 +447,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
Size: multipartObjetSize,
|
Size: multipartObjetSize,
|
||||||
Encryption: p.Info.Encryption,
|
Encryption: p.Info.Encryption,
|
||||||
CopiesNumbers: multipartInfo.CopiesNumbers,
|
CopiesNumbers: multipartInfo.CopiesNumbers,
|
||||||
|
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
|
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,
|
||||||
|
|
|
@ -34,7 +34,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
||||||
CopiesNumber: p.CopiesNumbers,
|
CopiesNumber: p.CopiesNumbers,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -287,10 +290,23 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
}
|
}
|
||||||
|
|
||||||
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if len(p.ContentMD5) > 0 {
|
||||||
|
headerMd5Hash, err := base64.StdEncoding.DecodeString(p.ContentMD5)
|
||||||
|
if err != nil {
|
||||||
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(headerMd5Hash, md5Hash) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Actually we can compare raw bytes (to avoid extra conversion) Actually we can compare raw bytes (to avoid extra conversion)
|
|||||||
|
err = n.objectDelete(ctx, p.BktInfo, id)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
}
|
||||||
|
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||||
|
|
||||||
|
@ -304,6 +320,11 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
IsUnversioned: !bktSettings.VersioningEnabled(),
|
IsUnversioned: !bktSettings.VersioningEnabled(),
|
||||||
IsCombined: p.Header[MultipartObjectSize] != "",
|
IsCombined: p.Header[MultipartObjectSize] != "",
|
||||||
}
|
}
|
||||||
|
if len(p.CompleteMD5Hash) > 0 {
|
||||||
|
newVersion.MD5 = p.CompleteMD5Hash
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
The The `ok` variable was defined quite a long time ago. So it's better to use a more meaningful name
|
|||||||
|
} else {
|
||||||
|
newVersion.MD5 = hex.EncodeToString(md5Hash)
|
||||||
|
}
|
||||||
|
|
||||||
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Why do we store md5 in different formats in this field (hex ( Why do we store md5 in different formats in this field (hex (`newVersion.MD5 = hex.EncodeToString(md5Hash)`) vs base64 (`newVersion.MD5 = md5EncodedHash`) ?
mbiryukova
commented
Format of header depends on Format of header depends on `IsCompleteMultipart`, I'll change this to be more logical
|
|||||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||||
|
@ -340,6 +361,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
||||||
Headers: p.Header,
|
Headers: p.Header,
|
||||||
ContentType: p.Header[api.ContentType],
|
ContentType: p.Header[api.ContentType],
|
||||||
HashSum: newVersion.ETag,
|
HashSum: newVersion.ETag,
|
||||||
|
MD5Sum: newVersion.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
extendedObjInfo := &data.ExtendedObjectInfo{
|
extendedObjInfo := &data.ExtendedObjectInfo{
|
||||||
|
@ -378,6 +400,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
|
objInfo.MD5Sum = node.MD5
|
||||||
|
|
||||||
extObjInfo := &data.ExtendedObjectInfo{
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
|
@ -430,6 +453,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
objInfo := objectInfoFromMeta(bkt, meta)
|
objInfo := objectInfoFromMeta(bkt, meta)
|
||||||
|
objInfo.MD5Sum = foundVersion.MD5
|
||||||
|
|
||||||
extObjInfo := &data.ExtendedObjectInfo{
|
extObjInfo := &data.ExtendedObjectInfo{
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
|
@ -457,16 +481,18 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
|
||||||
|
|
||||||
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
|
||||||
// Returns object ID and payload sha256 hash.
|
// Returns object ID and payload sha256 hash.
|
||||||
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
|
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can be simplified
Can be simplified
```
objInfo.UseMD5 = n.features.MD5Enable()
```
|
|||||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
prm.ClientCut = n.features.ClientCut()
|
prm.ClientCut = n.features.ClientCut()
|
||||||
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
|
||||||
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
|
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
|
||||||
var size uint64
|
var size uint64
|
||||||
hash := sha256.New()
|
hash := sha256.New()
|
||||||
|
md5Hash := md5.New()
|
||||||
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
|
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
|
||||||
size += uint64(len(buf))
|
size += uint64(len(buf))
|
||||||
hash.Write(buf)
|
hash.Write(buf)
|
||||||
|
md5Hash.Write(buf)
|
||||||
})
|
})
|
||||||
id, err := n.frostFS.CreateObject(ctx, prm)
|
id, err := n.frostFS.CreateObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dkirillov
commented
As I understand we don't have to compute md5 at all if this feature is disabled As I understand we don't have to compute md5 at all if this feature is disabled
dkirillov
commented
It seems the fix for this comment somehow be missed in master It seems the fix for this comment somehow be missed in master
@mbiryukova @alexvanin
alexvanin
commented
For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything. For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything.
|
|||||||
|
@ -474,9 +500,9 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, oid.ID{}, nil, err
|
return 0, oid.ID{}, nil, nil, err
|
||||||
}
|
}
|
||||||
return size, id, hash.Sum(nil), nil
|
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
||||||
|
@ -807,6 +833,7 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
oi = objectInfoFromMeta(bktInfo, meta)
|
oi = objectInfoFromMeta(bktInfo, meta)
|
||||||
|
oi.MD5Sum = node.MD5
|
||||||
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
|
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
|
||||||
|
|
||||||
return oi
|
return oi
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
|
||||||
return oid.ID{}, err
|
return oid.ID{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ type (
|
||||||
bypassContentEncodingInChunks bool
|
bypassContentEncodingInChunks bool
|
||||||
clientCut bool
|
clientCut bool
|
||||||
maxBufferSizeForPut uint64
|
maxBufferSizeForPut uint64
|
||||||
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -191,6 +192,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings.setClientCut(v.GetBool(cfgClientCut))
|
settings.setClientCut(v.GetBool(cfgClientCut))
|
||||||
settings.initPlacementPolicy(log.logger, v)
|
settings.initPlacementPolicy(log.logger, v)
|
||||||
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
settings.setMD5Enabled(v.GetBool(cfgMD5Enabled))
|
||||||
|
|
||||||
return settings
|
return settings
|
||||||
}
|
}
|
||||||
|
@ -312,6 +314,18 @@ func (s *appSettings) CompleteMultipartKeepalive() time.Duration {
|
||||||
return s.completeMultipartKeepalive
|
return s.completeMultipartKeepalive
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) MD5Enabled() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.md5Enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setMD5Enabled(md5Enabled bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.md5Enabled = md5Enabled
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
@ -604,6 +618,7 @@ func (a *App) updateSettings() {
|
||||||
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
|
||||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
a.settings.setMD5Enabled(a.cfg.GetBool(cfgMD5Enabled))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) startServices() {
|
func (a *App) startServices() {
|
||||||
|
|
|
@ -162,6 +162,9 @@ const ( // Settings.
|
||||||
// Runtime.
|
// Runtime.
|
||||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||||
|
|
||||||
|
// Enable return MD5 checksum in ETag.
|
||||||
|
cfgMD5Enabled = "features.md5.enabled"
|
||||||
|
|
||||||
// envPrefix is an environment variables prefix used for configuration.
|
// envPrefix is an environment variables prefix used for configuration.
|
||||||
envPrefix = "S3_GW"
|
envPrefix = "S3_GW"
|
||||||
)
|
)
|
||||||
|
|
|
@ -150,3 +150,5 @@ S3_GW_TRACING_ENDPOINT="localhost:4318"
|
||||||
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
S3_GW_TRACING_EXPORTER="otlp_grpc"
|
||||||
|
|
||||||
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||||
|
|
||||||
|
S3_GW_FEATURES_MD5_ENABLED=false
|
||||||
|
|
|
@ -176,3 +176,7 @@ kludge:
|
||||||
|
|
||||||
runtime:
|
runtime:
|
||||||
soft_memory_limit: 1gb
|
soft_memory_limit: 1gb
|
||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Maybe we can introduce new section Also we should add description of this param to Please run Maybe we can introduce new section `features` or `hashes` instead of just `md5`?
Also we should add description of this param to `docs/configuration.md` and to `config/config.env`.
Please run `pre-commit run --all` to see and fix some linter errors (e.g. no empty line at the end of the file)
mbiryukova
commented
And put And put `md5` in new section?
|
|||||||
|
features:
|
||||||
|
md5:
|
||||||
|
enabled: false
|
||||||
|
|
|
@ -186,6 +186,7 @@ There are some custom types used for brevity:
|
||||||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||||
| `kludge` | [Different kludge configuration](#kludge-section) |
|
| `kludge` | [Different kludge configuration](#kludge-section) |
|
||||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||||
|
| `features` | [Features configuration](#features-section) |
|
||||||
|
|
||||||
### General section
|
### General section
|
||||||
|
|
||||||
|
@ -562,3 +563,16 @@ runtime:
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
||||||
|
|
||||||
|
# `features` section
|
||||||
|
Contains parameters for enabling features.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
features:
|
||||||
|
md5:
|
||||||
|
enabled: false
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Let's name this Let's name this `enabled` (not `enable`) to be more consistent with other config values (e.g. `pprof.enabled`, `nats.enabled`, etc)
|
|||||||
|
|---------------|--------|---------------|---------------|----------------------------------------------------------------|
|
||||||
|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |
|
||||||
|
|
|
@ -75,6 +75,7 @@ const (
|
||||||
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
|
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
|
||||||
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
|
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
|
||||||
PutObject = "put object" // Debug in ../../api/layer/object.go
|
PutObject = "put object" // Debug in ../../api/layer/object.go
|
||||||
|
FailedToDeleteObject = "failed to delete object" // Debug in ../../api/layer/object.go
|
||||||
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
|
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
|
||||||
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
||||||
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
||||||
|
|
|
@ -81,6 +81,7 @@ const (
|
||||||
partNumberKV = "Number"
|
partNumberKV = "Number"
|
||||||
sizeKV = "Size"
|
sizeKV = "Size"
|
||||||
etagKV = "ETag"
|
etagKV = "ETag"
|
||||||
|
md5KV = "MD5"
|
||||||
|
|
||||||
// keys for lock.
|
// keys for lock.
|
||||||
isLockKV = "IsLock"
|
isLockKV = "IsLock"
|
||||||
|
@ -185,6 +186,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
||||||
_, isCombined := treeNode.Get(isCombinedKV)
|
_, isCombined := treeNode.Get(isCombinedKV)
|
||||||
eTag, _ := treeNode.Get(etagKV)
|
eTag, _ := treeNode.Get(etagKV)
|
||||||
|
md5, _ := treeNode.Get(md5KV)
|
||||||
|
|
||||||
version := &data.NodeVersion{
|
version := &data.NodeVersion{
|
||||||
BaseNodeVersion: data.BaseNodeVersion{
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
@ -193,6 +195,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
OID: treeNode.ObjID,
|
OID: treeNode.ObjID,
|
||||||
Timestamp: treeNode.TimeStamp,
|
Timestamp: treeNode.TimeStamp,
|
||||||
ETag: eTag,
|
ETag: eTag,
|
||||||
|
MD5: md5,
|
||||||
Size: treeNode.Size,
|
Size: treeNode.Size,
|
||||||
FilePath: filePath,
|
FilePath: filePath,
|
||||||
},
|
},
|
||||||
|
@ -302,6 +305,8 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
||||||
return nil, fmt.Errorf("invalid created timestamp: %w", err)
|
return nil, fmt.Errorf("invalid created timestamp: %w", err)
|
||||||
}
|
}
|
||||||
partInfo.Created = time.UnixMilli(utcMilli)
|
partInfo.Created = time.UnixMilli(utcMilli)
|
||||||
|
case md5KV:
|
||||||
|
partInfo.MD5 = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,7 +583,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
|
||||||
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
|
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
||||||
path := pathFromName(objectName)
|
path := pathFromName(objectName)
|
||||||
|
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
|
@ -1024,6 +1029,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
||||||
sizeKV: strconv.FormatUint(info.Size, 10),
|
sizeKV: strconv.FormatUint(info.Size, 10),
|
||||||
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
|
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
|
||||||
etagKV: info.ETag,
|
etagKV: info.ETag,
|
||||||
|
md5KV: info.MD5,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
|
@ -1156,6 +1162,9 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
if len(version.ETag) > 0 {
|
if len(version.ETag) > 0 {
|
||||||
meta[etagKV] = version.ETag
|
meta[etagKV] = version.ETag
|
||||||
}
|
}
|
||||||
|
if len(version.MD5) > 0 {
|
||||||
|
meta[md5KV] = version.MD5
|
||||||
|
}
|
||||||
|
|
||||||
if version.IsDeleteMarker() {
|
if version.IsDeleteMarker() {
|
||||||
meta[isDeleteMarkerKV] = "true"
|
meta[isDeleteMarkerKV] = "true"
|
||||||
|
@ -1200,7 +1209,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
|
||||||
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
|
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
|
||||||
path := pathFromName(filepath)
|
path := pathFromName(filepath)
|
||||||
p := &GetNodesParams{
|
p := &GetNodesParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
|
|
It seems we can drop the
UseMD5
field and check forMD5Sum != ""
. Or encapsulate this check to methodIn current implementation
MD5Sum
will not be empty if value was stored for objectThen maybe we can write something like
and then instead of
aff675a8eb/api/handler/put.go (L331-L335)
we can write (we can extend interface with
MD5Enabled() bool
sinceappSettings
implements it anyway )