[#205] Add md5 checksum in header #228

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/add_md5_checksum_in_header into master 2023-10-25 14:50:49 +00:00
26 changed files with 254 additions and 39 deletions

View file

@ -46,6 +46,7 @@ type (
Created time.Time
CreationEpoch uint64
HashSum string
MD5Sum string
Owner user.ID
dkirillov marked this conversation as resolved Outdated

It seems we can drop the UseMD5 field and check for MD5Sum != "". Or encapsulate this check to method

func (o ObjectInfo) UseMD5() bool {
    return o.MD5Sum != ""
}
It seems we can drop the `UseMD5` field and check for `MD5Sum != ""`. Or encapsulate this check to method ``` func (o ObjectInfo) UseMD5() bool { return o.MD5Sum != "" } ```

In current implementation MD5Sum will not be empty if value was stored for object

In current implementation `MD5Sum` will not be empty if value was stored for object

Then maybe we can write something like

func (o ObjectInfo) ETag(md5Enabled bool) string {
	if md5Enabled && len(o.MD5Sum) > 0 {
		return o.MD5Sum
	}

	return o.HashSum
}

and then instead of aff675a8eb/api/handler/put.go (L331-L335)

we can write (we can extend interface with MD5Enabled() bool since appSettings implements it anyway )

w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enable()))
Then maybe we can write something like ``` func (o ObjectInfo) ETag(md5Enabled bool) string { if md5Enabled && len(o.MD5Sum) > 0 { return o.MD5Sum } return o.HashSum } ``` and then instead of https://git.frostfs.info/mbiryukova/frostfs-s3-gw/src/commit/aff675a8eb80a15b634fc93d82b17d8a2372d620/api/handler/put.go#L331-L335 we can write (we can extend [interface](https://git.frostfs.info/mbiryukova/frostfs-s3-gw/src/commit/aff675a8eb80a15b634fc93d82b17d8a2372d620/api/handler/api.go#L33) with `MD5Enabled() bool` since `appSettings` implements it anyway ) ``` w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enable())) ```
Headers map[string]string
}
@ -116,6 +117,13 @@ func (o *ObjectInfo) Address() oid.Address {
return addr
}
func (o *ObjectInfo) ETag(md5Enabled bool) string {
if md5Enabled && len(o.MD5Sum) > 0 {
return o.MD5Sum
}
return o.HashSum
}
func (b BucketSettings) Unversioned() bool {
return b.Versioning == VersioningUnversioned
}

View file

@ -56,6 +56,7 @@ type BaseNodeVersion struct {
Timestamp uint64
Size uint64
ETag string
MD5 string
FilePath string
}
@ -86,6 +87,7 @@ type PartInfo struct {
OID oid.ID `json:"oid"`
Size uint64 `json:"size"`
ETag string `json:"etag"`
MD5 string `json:"md5"`
Created time.Time `json:"created"`
}

View file

@ -42,6 +42,7 @@ type (
IsResolveListAllow() bool
CompleteMultipartKeepalive() time.Duration
BypassContentEncodingInChunks() bool
MD5Enabled() bool
}
)

View file

@ -78,7 +78,8 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
}
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int,
isBucketUnversioned, md5Enabled bool) {
info := extendedInfo.ObjectInfo
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
h.Set(api.ContentType, info.ContentType)
@ -94,7 +95,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
}
h.Set(api.ETag, info.HashSum)
h.Set(api.ETag, info.ETag(md5Enabled))
dkirillov marked this conversation as resolved Outdated

If info.MD5Sum contains base64 encoded hash can we rename it to MD5SumBase64? But I would like to store hex encoded values and encode/decode where it's need

If `info.MD5Sum` contains base64 encoded hash can we rename it to `MD5SumBase64`? But I would like to store hex encoded values and encode/decode where it's need

info.MD5Sum is stored as hex encoded value

`info.MD5Sum` is stored as hex encoded value

Then we have to encode it to base64 here (and probably without quotes)

Then we have to encode it to base64 here (and probably without quotes)

I fixed this

I fixed this
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
if !isBucketUnversioned {
@ -222,7 +224,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
if params != nil {
writeRangeHeaders(w, params, fullSize)
} else {

View file

@ -197,6 +197,19 @@ func TestGetObject(t *testing.T) {
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
}
func TestGetObjectEnabledMD5(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "obj"
_, objInfo := createBucketAndObject(hc, bktName, objName)
_, headers := getObject(hc, bktName, objName)
require.Equal(t, objInfo.HashSum, headers.Get(api.ETag))
hc.config.md5Enabled = true
_, headers = getObject(hc, bktName, objName)
require.Equal(t, objInfo.MD5Sum, headers.Get(api.ETag))
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)

View file

@ -63,6 +63,7 @@ type configMock struct {
copiesNumbers map[string][]uint32
defaultCopiesNumbers []uint32
bypassContentEncodingInChunks bool
md5Enabled bool
}
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
@ -110,6 +111,10 @@ func (c *configMock) CompleteMultipartKeepalive() time.Duration {
return time.Duration(0)
}
func (c *configMock) MD5Enabled() bool {
return c.md5Enabled
}
func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false)
}

View file

@ -118,7 +118,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
w.WriteHeader(http.StatusOK)
}

View file

@ -246,6 +246,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
PartNumber: partNumber,
Size: size,
Reader: body,
dkirillov marked this conversation as resolved Outdated

It seems we can simplify this and write:

p := &layer.UploadPartParams{
    // ...
    MD5: r.Header.Get(api.ContentMD5)
}
It seems we can simplify this and write: ```golang p := &layer.UploadPartParams{ // ... MD5: r.Header.Get(api.ContentMD5) } ```
ContentMD5: r.Header.Get(api.ContentMD5),
}
p.Info.Encryption, err = formEncryptionParams(r)
@ -376,8 +377,8 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
}
response := UploadPartCopyResponse{
ETag: info.HashSum,
LastModified: info.Created.UTC().Format(time.RFC3339),
ETag: info.ETag(h.cfg.MD5Enabled()),
}
if p.Info.Encryption.Enabled() {
@ -452,8 +453,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
response := CompleteMultipartUploadResponse{
Bucket: objInfo.Bucket,
ETag: objInfo.HashSum,
Key: objInfo.Name,
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
}
// Here we previously set api.AmzVersionID header for versioned bucket.

View file

@ -2,6 +2,8 @@ package handler
import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
"net/http"
@ -276,6 +278,33 @@ func TestMultipartUploadWithContentLanguage(t *testing.T) {
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
}
func TestMultipartUploadEnabledMD5(t *testing.T) {
hc := prepareHandlerContext(t)
hc.config.md5Enabled = true
hc.layerFeatures.SetMD5Enabled(true)
bktName, objName := "bucket-md5", "object-md5"
createTestBucket(hc, bktName)
partSize := 5 * 1024 * 1024
multipartUpload := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, partBody1 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 1, partSize)
md5Sum1 := md5.Sum(partBody1)
require.Equal(t, hex.EncodeToString(md5Sum1[:]), etag1)
etag2, partBody2 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 2, partSize)
md5Sum2 := md5.Sum(partBody2)
require.Equal(t, hex.EncodeToString(md5Sum2[:]), etag2)
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
assertStatus(t, w, http.StatusOK)
resp := &CompleteMultipartUploadResponse{}
err := xml.NewDecoder(w.Result().Body).Decode(resp)
require.NoError(t, err)
completeMD5Sum := md5.Sum(append(md5Sum1[:], md5Sum2[:]...))
require.Equal(t, hex.EncodeToString(completeMD5Sum[:])+"-2", resp.ETag)
}
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
}

View file

@ -233,7 +233,7 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
return
}
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name)
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name, h.cfg.MD5Enabled())
if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err)
}
@ -261,7 +261,7 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
return &res, nil
}
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string) *ListObjectsVersionsResponse {
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
res := ListObjectsVersionsResponse{
Name: bucketName,
IsTruncated: info.IsTruncated,
@ -286,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
},
Size: ver.ObjectInfo.Size,
VersionID: ver.Version(),
ETag: ver.ObjectInfo.HashSum,
ETag: ver.ObjectInfo.ETag(md5Enabled),
})
}
// this loop is not starting till versioning is not implemented

View file

@ -245,6 +245,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
Size: size,
Header: metadata,
Encryption: encryptionParams,
ContentMD5: r.Header.Get(api.ContentMD5),
}
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
dkirillov marked this conversation as resolved Outdated

This can be simplified to

params.ContentMD5 = r.Header.Get(api.ContentMD5)

or even

params := &layer.PutObjectParams{
	// ...
	ContentMD5: r.Header.Get(api.ContentMD5),
}
This can be simplified to ``` params.ContentMD5 = r.Header.Get(api.ContentMD5) ``` or even ``` params := &layer.PutObjectParams{ // ... ContentMD5: r.Header.Get(api.ContentMD5), } ```
@ -327,7 +328,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
addSSECHeaders(w.Header(), r.Header)
}
w.Header().Set(api.ETag, objInfo.HashSum)
w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enabled()))
middleware.WriteSuccessResponseHeadersOnly(w)
}
@ -562,7 +564,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
resp := &PostResponse{
Bucket: objInfo.Bucket,
Key: objInfo.Name,
ETag: objInfo.HashSum,
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
}
w.WriteHeader(status)
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {

View file

@ -3,7 +3,10 @@ package handler
import (
"bytes"
"context"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"io"
@ -194,6 +197,37 @@ func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
}
func TestPutObjectWithInvalidContentMD5(t *testing.T) {
tc := prepareHandlerContext(t)
tc.config.md5Enabled = true
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString([]byte("invalid")))
tc.Handler().PutObjectHandler(w, r)
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidDigest))
checkNotFound(t, tc, bktName, objName, emptyVersion)
}
func TestPutObjectWithEnabledMD5(t *testing.T) {
tc := prepareHandlerContext(t)
tc.config.md5Enabled = true
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
md5Hash := md5.New()
md5Hash.Write(content)
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
tc.Handler().PutObjectHandler(w, r)
require.Equal(t, hex.EncodeToString(md5Hash.Sum(nil)), w.Header().Get(api.ETag))
}
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
hc := prepareHandlerContext(t)

View file

@ -44,7 +44,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
CopiesNumber: p.CopiesNumbers,
}
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return fmt.Errorf("put system object: %w", err)
}

View file

@ -26,7 +26,8 @@ import (
)
type FeatureSettingsMock struct {
clientCut bool
clientCut bool
md5Enabled bool
}
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
@ -41,6 +42,14 @@ func (k *FeatureSettingsMock) SetClientCut(clientCut bool) {
k.clientCut = clientCut
}
func (k *FeatureSettingsMock) MD5Enabled() bool {
return k.md5Enabled
}
func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
k.md5Enabled = md5Enabled
}
type TestFrostFS struct {
FrostFS

View file

@ -50,6 +50,7 @@ type (
FeatureSettings interface {
ClientCut() bool
BufferMaxSizeForPut() uint64
MD5Enabled() bool
}
layer struct {
@ -111,14 +112,16 @@ type (
// PutObjectParams stores object put request parameters.
PutObjectParams struct {
BktInfo *data.BucketInfo
Object string
Size uint64
Reader io.Reader
Header map[string]string
Lock *data.ObjectLock
Encryption encryption.Params
CopiesNumbers []uint32
BktInfo *data.BucketInfo
Object string
Size uint64
Reader io.Reader
Header map[string]string
Lock *data.ObjectLock
Encryption encryption.Params
dkirillov marked this conversation as resolved Outdated

I would rename this to something like NeedComputeMD5

I would rename this to something like `NeedComputeMD5`
CopiesNumbers []uint32
CompleteMD5Hash string
ContentMD5 string
}
PutCombinedObjectParams struct {

View file

@ -3,6 +3,8 @@ package layer
import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
@ -68,6 +70,7 @@ type (
PartNumber int
Size uint64
Reader io.Reader
ContentMD5 string
}
UploadCopyParams struct {
@ -197,7 +200,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
return "", err
}
return objInfo.HashSum, nil
return objInfo.ETag(n.features.MD5Enabled()), nil
}
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
@ -230,10 +233,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
size, id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}
if len(p.ContentMD5) > 0 {
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
prm := PrmObjectDelete{
Object: id,
Container: bktInfo.CID,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
err = n.frostFS.DeleteObject(ctx, prm)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
}
if p.Info.Encryption.Enabled() {
size = decSize
}
@ -250,6 +271,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Size: size,
ETag: hex.EncodeToString(hash),
Created: prm.CreationTime,
MD5: hex.EncodeToString(md5Hash),
}
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
@ -274,6 +296,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Size: partInfo.Size,
Created: partInfo.Created,
HashSum: partInfo.ETag,
MD5Sum: partInfo.MD5,
}
return objInfo, nil
@ -347,9 +370,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
parts := make([]*data.PartInfo, 0, len(p.Parts))
var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || part.ETag != partInfo.ETag {
if partInfo == nil || (part.ETag != partInfo.ETag && part.ETag != partInfo.MD5) {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
}
delete(partsInfo, part.PartNumber)
@ -376,6 +400,12 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
return nil, nil, err
}
bytesHash, err := hex.DecodeString(partInfo.MD5)
if err != nil {
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
}
md5Hash.Write(bytesHash)
}
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
@ -410,13 +440,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
}
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt,
Object: p.Info.Key,
Reader: bytes.NewReader(partsData),
Header: initMetadata,
Size: multipartObjetSize,
Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers,
BktInfo: p.Info.Bkt,
Object: p.Info.Key,
Reader: bytes.NewReader(partsData),
Header: initMetadata,
Size: multipartObjetSize,
Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers,
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
})
if err != nil {
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,

View file

@ -34,7 +34,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
CopiesNumber: p.CopiesNumbers,
}
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return err
}

View file

@ -1,8 +1,11 @@
package layer
import (
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
@ -287,10 +290,23 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, err
}
if len(p.ContentMD5) > 0 {
headerMd5Hash, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
}
if !bytes.Equal(headerMd5Hash, md5Hash) {
dkirillov marked this conversation as resolved Outdated

Actually we can compare raw bytes (to avoid extra conversion)

Actually we can compare raw bytes (to avoid extra conversion)
err = n.objectDelete(ctx, p.BktInfo, id)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
}
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
}
}
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
@ -304,6 +320,11 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
}
if len(p.CompleteMD5Hash) > 0 {
newVersion.MD5 = p.CompleteMD5Hash
dkirillov marked this conversation as resolved Outdated

The ok variable was defined quite a long time ago. So it's better to use a more meaningful name

The `ok` variable was defined quite a long time ago. So it's better to use a more meaningful name
} else {
newVersion.MD5 = hex.EncodeToString(md5Hash)
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
dkirillov marked this conversation as resolved Outdated

Why do we store md5 in different formats in this field (hex (newVersion.MD5 = hex.EncodeToString(md5Hash)) vs base64 (newVersion.MD5 = md5EncodedHash) ?

Why do we store md5 in different formats in this field (hex (`newVersion.MD5 = hex.EncodeToString(md5Hash)`) vs base64 (`newVersion.MD5 = md5EncodedHash`) ?

Format of header depends on IsCompleteMultipart, I'll change this to be more logical

Format of header depends on `IsCompleteMultipart`, I'll change this to be more logical
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
@ -340,6 +361,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Headers: p.Header,
ContentType: p.Header[api.ContentType],
HashSum: newVersion.ETag,
MD5Sum: newVersion.MD5,
}
extendedObjInfo := &data.ExtendedObjectInfo{
@ -378,6 +400,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, err
}
objInfo := objectInfoFromMeta(bkt, meta)
objInfo.MD5Sum = node.MD5
extObjInfo := &data.ExtendedObjectInfo{
ObjectInfo: objInfo,
@ -430,6 +453,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return nil, err
}
objInfo := objectInfoFromMeta(bkt, meta)
objInfo.MD5Sum = foundVersion.MD5
extObjInfo := &data.ExtendedObjectInfo{
ObjectInfo: objInfo,
@ -457,16 +481,18 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash.
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
dkirillov marked this conversation as resolved Outdated

Can be simplified

objInfo.UseMD5 = n.features.MD5Enable()
Can be simplified ``` objInfo.UseMD5 = n.features.MD5Enable() ```
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
prm.ClientCut = n.features.ClientCut()
prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
var size uint64
hash := sha256.New()
md5Hash := md5.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
size += uint64(len(buf))
hash.Write(buf)
md5Hash.Write(buf)
})
id, err := n.frostFS.CreateObject(ctx, prm)
if err != nil {

As I understand we don't have to compute md5 at all if this feature is disabled

As I understand we don't have to compute md5 at all if this feature is disabled

It seems the fix for this comment somehow be missed in master
@mbiryukova @alexvanin

It seems the fix for this comment somehow be missed in master @mbiryukova @alexvanin

For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything.

For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything.
@ -474,9 +500,9 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
}
return 0, oid.ID{}, nil, err
return 0, oid.ID{}, nil, nil, err
}
return size, id, hash.Sum(nil), nil
return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
@ -807,6 +833,7 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
}
oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi

View file

@ -125,7 +125,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return oid.ID{}, err
}
_, id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
_, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err
}

View file

@ -86,6 +86,7 @@ type (
bypassContentEncodingInChunks bool
clientCut bool
maxBufferSizeForPut uint64
md5Enabled bool
}
maxClientsConfig struct {
@ -191,6 +192,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings.setClientCut(v.GetBool(cfgClientCut))
settings.initPlacementPolicy(log.logger, v)
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
settings.setMD5Enabled(v.GetBool(cfgMD5Enabled))
return settings
}
@ -312,6 +314,18 @@ func (s *appSettings) CompleteMultipartKeepalive() time.Duration {
return s.completeMultipartKeepalive
}
func (s *appSettings) MD5Enabled() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.md5Enabled
}
func (s *appSettings) setMD5Enabled(md5Enabled bool) {
s.mu.Lock()
s.md5Enabled = md5Enabled
s.mu.Unlock()
}
func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx)
a.initHandler()
@ -604,6 +618,7 @@ func (a *App) updateSettings() {
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
a.settings.setMD5Enabled(a.cfg.GetBool(cfgMD5Enabled))
}
func (a *App) startServices() {

View file

@ -162,6 +162,9 @@ const ( // Settings.
// Runtime.
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
// Enable return MD5 checksum in ETag.
cfgMD5Enabled = "features.md5.enabled"
// envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW"
)

View file

@ -150,3 +150,5 @@ S3_GW_TRACING_ENDPOINT="localhost:4318"
S3_GW_TRACING_EXPORTER="otlp_grpc"
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
S3_GW_FEATURES_MD5_ENABLED=false

View file

@ -176,3 +176,7 @@ kludge:
runtime:
soft_memory_limit: 1gb
dkirillov marked this conversation as resolved Outdated

Maybe we can introduce new section features or hashes instead of just md5?

Also we should add description of this param to docs/configuration.md and to config/config.env.

Please run pre-commit run --all to see and fix some linter errors (e.g. no empty line at the end of the file)

Maybe we can introduce new section `features` or `hashes` instead of just `md5`? Also we should add description of this param to `docs/configuration.md` and to `config/config.env`. Please run `pre-commit run --all` to see and fix some linter errors (e.g. no empty line at the end of the file)

And put md5 in new section?

And put `md5` in new section?
features:
md5:
enabled: false

View file

@ -186,6 +186,7 @@ There are some custom types used for brevity:
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `kludge` | [Different kludge configuration](#kludge-section) |
| `runtime` | [Runtime configuration](#runtime-section) |
| `features` | [Features configuration](#features-section) |
### General section
@ -562,3 +563,16 @@ runtime:
| Parameter | Type | SIGHUP reload | Default value | Description |
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
# `features` section
Contains parameters for enabling features.
```yaml
features:
md5:
enabled: false
```
| Parameter | Type | SIGHUP reload | Default value | Description |
dkirillov marked this conversation as resolved Outdated

Let's name this enabled (not enable) to be more consistent with other config values (e.g. pprof.enabled, nats.enabled, etc)

Let's name this `enabled` (not `enable`) to be more consistent with other config values (e.g. `pprof.enabled`, `nats.enabled`, etc)
|---------------|--------|---------------|---------------|----------------------------------------------------------------|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |

View file

@ -75,6 +75,7 @@ const (
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
PutObject = "put object" // Debug in ../../api/layer/object.go
FailedToDeleteObject = "failed to delete object" // Debug in ../../api/layer/object.go
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go

View file

@ -81,6 +81,7 @@ const (
partNumberKV = "Number"
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
// keys for lock.
isLockKV = "IsLock"
@ -185,6 +186,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV)
md5, _ := treeNode.Get(md5KV)
version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
@ -193,6 +195,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp,
ETag: eTag,
MD5: md5,
Size: treeNode.Size,
FilePath: filePath,
},
@ -302,6 +305,8 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
return nil, fmt.Errorf("invalid created timestamp: %w", err)
}
partInfo.Created = time.UnixMilli(utcMilli)
case md5KV:
partInfo.MD5 = value
}
}
@ -578,7 +583,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
}
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(objectName)
p := &GetNodesParams{
@ -1024,6 +1029,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag,
md5KV: info.MD5,
}
for _, part := range parts {
@ -1156,6 +1162,9 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag
}
if len(version.MD5) > 0 {
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker() {
meta[isDeleteMarkerKV] = "true"
@ -1200,7 +1209,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
}
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV}
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(filepath)
p := &GetNodesParams{
BktInfo: bktInfo,