[#205] Add md5 checksum in header #228

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/add_md5_checksum_in_header into master 2023-10-25 14:50:49 +00:00
26 changed files with 254 additions and 39 deletions

View file

@ -46,6 +46,7 @@ type (
Created time.Time Created time.Time
CreationEpoch uint64 CreationEpoch uint64
HashSum string HashSum string
MD5Sum string
Owner user.ID Owner user.ID
dkirillov marked this conversation as resolved Outdated

It seems we can drop the UseMD5 field and check for MD5Sum != "". Or encapsulate this check to method

func (o ObjectInfo) UseMD5() bool {
    return o.MD5Sum != ""
}
It seems we can drop the `UseMD5` field and check for `MD5Sum != ""`. Or encapsulate this check to method ``` func (o ObjectInfo) UseMD5() bool { return o.MD5Sum != "" } ```

In current implementation MD5Sum will not be empty if value was stored for object

In current implementation `MD5Sum` will not be empty if value was stored for object

Then maybe we can write something like

func (o ObjectInfo) ETag(md5Enabled bool) string {
	if md5Enabled && len(o.MD5Sum) > 0 {
		return o.MD5Sum
	}

	return o.HashSum
}

and then instead of aff675a8eb/api/handler/put.go (L331-L335)

we can write (we can extend interface with MD5Enabled() bool since appSettings implements it anyway )

w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enable()))
Then maybe we can write something like ``` func (o ObjectInfo) ETag(md5Enabled bool) string { if md5Enabled && len(o.MD5Sum) > 0 { return o.MD5Sum } return o.HashSum } ``` and then instead of https://git.frostfs.info/mbiryukova/frostfs-s3-gw/src/commit/aff675a8eb80a15b634fc93d82b17d8a2372d620/api/handler/put.go#L331-L335 we can write (we can extend [interface](https://git.frostfs.info/mbiryukova/frostfs-s3-gw/src/commit/aff675a8eb80a15b634fc93d82b17d8a2372d620/api/handler/api.go#L33) with `MD5Enabled() bool` since `appSettings` implements it anyway ) ``` w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enable())) ```
Headers map[string]string Headers map[string]string
} }
@ -116,6 +117,13 @@ func (o *ObjectInfo) Address() oid.Address {
return addr return addr
} }
func (o *ObjectInfo) ETag(md5Enabled bool) string {
if md5Enabled && len(o.MD5Sum) > 0 {
return o.MD5Sum
}
return o.HashSum
}
func (b BucketSettings) Unversioned() bool { func (b BucketSettings) Unversioned() bool {
return b.Versioning == VersioningUnversioned return b.Versioning == VersioningUnversioned
} }

View file

@ -56,6 +56,7 @@ type BaseNodeVersion struct {
Timestamp uint64 Timestamp uint64
Size uint64 Size uint64
ETag string ETag string
MD5 string
FilePath string FilePath string
} }
@ -86,6 +87,7 @@ type PartInfo struct {
OID oid.ID `json:"oid"` OID oid.ID `json:"oid"`
Size uint64 `json:"size"` Size uint64 `json:"size"`
ETag string `json:"etag"` ETag string `json:"etag"`
MD5 string `json:"md5"`
Created time.Time `json:"created"` Created time.Time `json:"created"`
} }

View file

@ -42,6 +42,7 @@ type (
IsResolveListAllow() bool IsResolveListAllow() bool
CompleteMultipartKeepalive() time.Duration CompleteMultipartKeepalive() time.Duration
BypassContentEncodingInChunks() bool BypassContentEncodingInChunks() bool
MD5Enabled() bool
} }
) )

View file

@ -78,7 +78,8 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5)) responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
} }
func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) { func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int,
isBucketUnversioned, md5Enabled bool) {
info := extendedInfo.ObjectInfo info := extendedInfo.ObjectInfo
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" { if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
h.Set(api.ContentType, info.ContentType) h.Set(api.ContentType, info.ContentType)
@ -94,7 +95,8 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10)) h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
} }
h.Set(api.ETag, info.HashSum) h.Set(api.ETag, info.ETag(md5Enabled))
dkirillov marked this conversation as resolved Outdated

If info.MD5Sum contains base64 encoded hash can we rename it to MD5SumBase64? But I would like to store hex encoded values and encode/decode where it's need

If `info.MD5Sum` contains base64 encoded hash can we rename it to `MD5SumBase64`? But I would like to store hex encoded values and encode/decode where it's need

info.MD5Sum is stored as hex encoded value

`info.MD5Sum` is stored as hex encoded value

Then we have to encode it to base64 here (and probably without quotes)

Then we have to encode it to base64 here (and probably without quotes)

I fixed this

I fixed this
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength)) h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))
if !isBucketUnversioned { if !isBucketUnversioned {
@ -222,7 +224,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
if params != nil { if params != nil {
writeRangeHeaders(w, params, fullSize) writeRangeHeaders(w, params, fullSize)
} else { } else {

View file

@ -197,6 +197,19 @@ func TestGetObject(t *testing.T) {
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey) getObjectAssertS3Error(hc, bktName, objName, emptyVersion, errors.ErrNoSuchKey)
} }
func TestGetObjectEnabledMD5(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "obj"
_, objInfo := createBucketAndObject(hc, bktName, objName)
_, headers := getObject(hc, bktName, objName)
require.Equal(t, objInfo.HashSum, headers.Get(api.ETag))
hc.config.md5Enabled = true
_, headers = getObject(hc, bktName, objName)
require.Equal(t, objInfo.MD5Sum, headers.Get(api.ETag))
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) { func putObjectContent(hc *handlerContext, bktName, objName, content string) {
body := bytes.NewReader([]byte(content)) body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body) w, r := prepareTestPayloadRequest(hc, bktName, objName, body)

View file

@ -63,6 +63,7 @@ type configMock struct {
copiesNumbers map[string][]uint32 copiesNumbers map[string][]uint32
defaultCopiesNumbers []uint32 defaultCopiesNumbers []uint32
bypassContentEncodingInChunks bool bypassContentEncodingInChunks bool
md5Enabled bool
} }
func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy { func (c *configMock) DefaultPlacementPolicy() netmap.PlacementPolicy {
@ -110,6 +111,10 @@ func (c *configMock) CompleteMultipartKeepalive() time.Duration {
return time.Duration(0) return time.Duration(0)
} }
func (c *configMock) MD5Enabled() bool {
return c.md5Enabled
}
func prepareHandlerContext(t *testing.T) *handlerContext { func prepareHandlerContext(t *testing.T) *handlerContext {
return prepareHandlerContextBase(t, false) return prepareHandlerContextBase(t, false)
} }

View file

@ -118,7 +118,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned(), h.cfg.MD5Enabled())
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }

View file

@ -246,6 +246,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
PartNumber: partNumber, PartNumber: partNumber,
Size: size, Size: size,
Reader: body, Reader: body,
dkirillov marked this conversation as resolved Outdated

It seems we can simplify this and write:

p := &layer.UploadPartParams{
    // ...
    MD5: r.Header.Get(api.ContentMD5)
}
It seems we can simplify this and write: ```golang p := &layer.UploadPartParams{ // ... MD5: r.Header.Get(api.ContentMD5) } ```
ContentMD5: r.Header.Get(api.ContentMD5),
} }
p.Info.Encryption, err = formEncryptionParams(r) p.Info.Encryption, err = formEncryptionParams(r)
@ -376,8 +377,8 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
} }
response := UploadPartCopyResponse{ response := UploadPartCopyResponse{
ETag: info.HashSum,
LastModified: info.Created.UTC().Format(time.RFC3339), LastModified: info.Created.UTC().Format(time.RFC3339),
ETag: info.ETag(h.cfg.MD5Enabled()),
} }
if p.Info.Encryption.Enabled() { if p.Info.Encryption.Enabled() {
@ -452,8 +453,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
response := CompleteMultipartUploadResponse{ response := CompleteMultipartUploadResponse{
Bucket: objInfo.Bucket, Bucket: objInfo.Bucket,
ETag: objInfo.HashSum,
Key: objInfo.Name, Key: objInfo.Name,
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
} }
// Here we previously set api.AmzVersionID header for versioned bucket. // Here we previously set api.AmzVersionID header for versioned bucket.

View file

@ -2,6 +2,8 @@ package handler
import ( import (
"bytes" "bytes"
"crypto/md5"
"encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"net/http" "net/http"
@ -276,6 +278,33 @@ func TestMultipartUploadWithContentLanguage(t *testing.T) {
require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage)) require.Equal(t, exceptedContentLanguage, w.Header().Get(api.ContentLanguage))
} }
func TestMultipartUploadEnabledMD5(t *testing.T) {
hc := prepareHandlerContext(t)
hc.config.md5Enabled = true
hc.layerFeatures.SetMD5Enabled(true)
bktName, objName := "bucket-md5", "object-md5"
createTestBucket(hc, bktName)
partSize := 5 * 1024 * 1024
multipartUpload := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, partBody1 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 1, partSize)
md5Sum1 := md5.Sum(partBody1)
require.Equal(t, hex.EncodeToString(md5Sum1[:]), etag1)
etag2, partBody2 := uploadPart(hc, bktName, objName, multipartUpload.UploadID, 2, partSize)
md5Sum2 := md5.Sum(partBody2)
require.Equal(t, hex.EncodeToString(md5Sum2[:]), etag2)
w := completeMultipartUploadBase(hc, bktName, objName, multipartUpload.UploadID, []string{etag1, etag2})
assertStatus(t, w, http.StatusOK)
resp := &CompleteMultipartUploadResponse{}
err := xml.NewDecoder(w.Result().Body).Decode(resp)
require.NoError(t, err)
completeMD5Sum := md5.Sum(append(md5Sum1[:], md5Sum2[:]...))
require.Equal(t, hex.EncodeToString(completeMD5Sum[:])+"-2", resp.ETag)
}
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse { func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end) return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
} }

View file

@ -233,7 +233,7 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
return return
} }
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name) response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name, h.cfg.MD5Enabled())
if err = middleware.EncodeToResponse(w, response); err != nil { if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err) h.logAndSendError(w, "something went wrong", reqInfo, err)
} }
@ -261,7 +261,7 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
return &res, nil return &res, nil
} }
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string) *ListObjectsVersionsResponse { func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
res := ListObjectsVersionsResponse{ res := ListObjectsVersionsResponse{
Name: bucketName, Name: bucketName,
IsTruncated: info.IsTruncated, IsTruncated: info.IsTruncated,
@ -286,7 +286,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
}, },
Size: ver.ObjectInfo.Size, Size: ver.ObjectInfo.Size,
VersionID: ver.Version(), VersionID: ver.Version(),
ETag: ver.ObjectInfo.HashSum, ETag: ver.ObjectInfo.ETag(md5Enabled),
}) })
} }
// this loop is not starting till versioning is not implemented // this loop is not starting till versioning is not implemented

View file

@ -245,6 +245,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
Size: size, Size: size,
Header: metadata, Header: metadata,
Encryption: encryptionParams, Encryption: encryptionParams,
ContentMD5: r.Header.Get(api.ContentMD5),
} }
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint) params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
dkirillov marked this conversation as resolved Outdated

This can be simplified to

params.ContentMD5 = r.Header.Get(api.ContentMD5)

or even

params := &layer.PutObjectParams{
	// ...
	ContentMD5: r.Header.Get(api.ContentMD5),
}
This can be simplified to ``` params.ContentMD5 = r.Header.Get(api.ContentMD5) ``` or even ``` params := &layer.PutObjectParams{ // ... ContentMD5: r.Header.Get(api.ContentMD5), } ```
@ -327,7 +328,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
addSSECHeaders(w.Header(), r.Header) addSSECHeaders(w.Header(), r.Header)
} }
w.Header().Set(api.ETag, objInfo.HashSum) w.Header().Set(api.ETag, objInfo.ETag(h.cfg.MD5Enabled()))
middleware.WriteSuccessResponseHeadersOnly(w) middleware.WriteSuccessResponseHeadersOnly(w)
} }
@ -562,7 +564,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
resp := &PostResponse{ resp := &PostResponse{
Bucket: objInfo.Bucket, Bucket: objInfo.Bucket,
Key: objInfo.Name, Key: objInfo.Name,
ETag: objInfo.HashSum, ETag: objInfo.ETag(h.cfg.MD5Enabled()),
} }
w.WriteHeader(status) w.WriteHeader(status)
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil { if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {

View file

@ -3,7 +3,10 @@ package handler
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/md5"
"crypto/rand" "crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
@ -194,6 +197,37 @@ func TestPutObjectWithWrapReaderDiscardOnError(t *testing.T) {
require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object") require.Equal(t, numGoroutineBefore, numGoroutineAfter, "goroutines shouldn't leak during put object")
} }
func TestPutObjectWithInvalidContentMD5(t *testing.T) {
tc := prepareHandlerContext(t)
tc.config.md5Enabled = true
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString([]byte("invalid")))
tc.Handler().PutObjectHandler(w, r)
assertS3Error(t, w, s3errors.GetAPIError(s3errors.ErrInvalidDigest))
checkNotFound(t, tc, bktName, objName, emptyVersion)
}
func TestPutObjectWithEnabledMD5(t *testing.T) {
tc := prepareHandlerContext(t)
tc.config.md5Enabled = true
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
md5Hash := md5.New()
md5Hash.Write(content)
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
tc.Handler().PutObjectHandler(w, r)
require.Equal(t, hex.EncodeToString(md5Hash.Sum(nil)), w.Header().Get(api.ETag))
}
func TestPutObjectWithStreamBodyAWSExample(t *testing.T) { func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -44,7 +44,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
CopiesNumber: p.CopiesNumbers, CopiesNumber: p.CopiesNumbers,
} }
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) _, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil { if err != nil {
return fmt.Errorf("put system object: %w", err) return fmt.Errorf("put system object: %w", err)
} }

View file

@ -26,7 +26,8 @@ import (
) )
type FeatureSettingsMock struct { type FeatureSettingsMock struct {
clientCut bool clientCut bool
md5Enabled bool
} }
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 { func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
@ -41,6 +42,14 @@ func (k *FeatureSettingsMock) SetClientCut(clientCut bool) {
k.clientCut = clientCut k.clientCut = clientCut
} }
func (k *FeatureSettingsMock) MD5Enabled() bool {
return k.md5Enabled
}
func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
k.md5Enabled = md5Enabled
}
type TestFrostFS struct { type TestFrostFS struct {
FrostFS FrostFS

View file

@ -50,6 +50,7 @@ type (
FeatureSettings interface { FeatureSettings interface {
ClientCut() bool ClientCut() bool
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
MD5Enabled() bool
} }
layer struct { layer struct {
@ -111,14 +112,16 @@ type (
// PutObjectParams stores object put request parameters. // PutObjectParams stores object put request parameters.
PutObjectParams struct { PutObjectParams struct {
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
Object string Object string
Size uint64 Size uint64
Reader io.Reader Reader io.Reader
Header map[string]string Header map[string]string
Lock *data.ObjectLock Lock *data.ObjectLock
Encryption encryption.Params Encryption encryption.Params
dkirillov marked this conversation as resolved Outdated

I would rename this to something like NeedComputeMD5

I would rename this to something like `NeedComputeMD5`
CopiesNumbers []uint32 CopiesNumbers []uint32
CompleteMD5Hash string
ContentMD5 string
} }
PutCombinedObjectParams struct { PutCombinedObjectParams struct {

View file

@ -3,6 +3,8 @@ package layer
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/md5"
"encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
@ -68,6 +70,7 @@ type (
PartNumber int PartNumber int
Size uint64 Size uint64
Reader io.Reader Reader io.Reader
ContentMD5 string
} }
UploadCopyParams struct { UploadCopyParams struct {
@ -197,7 +200,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
return "", err return "", err
} }
return objInfo.HashSum, nil return objInfo.ETag(n.features.MD5Enabled()), nil
} }
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) { func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
@ -230,10 +233,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
size, id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(p.ContentMD5) > 0 {
hashBytes, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
if hex.EncodeToString(hashBytes) != hex.EncodeToString(md5Hash) {
prm := PrmObjectDelete{
Object: id,
Container: bktInfo.CID,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
err = n.frostFS.DeleteObject(ctx, prm)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
}
return nil, s3errors.GetAPIError(s3errors.ErrInvalidDigest)
}
}
if p.Info.Encryption.Enabled() { if p.Info.Encryption.Enabled() {
size = decSize size = decSize
} }
@ -250,6 +271,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Size: size, Size: size,
ETag: hex.EncodeToString(hash), ETag: hex.EncodeToString(hash),
Created: prm.CreationTime, Created: prm.CreationTime,
MD5: hex.EncodeToString(md5Hash),
} }
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
@ -274,6 +296,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Size: partInfo.Size, Size: partInfo.Size,
Created: partInfo.Created, Created: partInfo.Created,
HashSum: partInfo.ETag, HashSum: partInfo.ETag,
MD5Sum: partInfo.MD5,
} }
return objInfo, nil return objInfo, nil
@ -347,9 +370,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
parts := make([]*data.PartInfo, 0, len(p.Parts)) parts := make([]*data.PartInfo, 0, len(p.Parts))
var completedPartsHeader strings.Builder var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts { for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber] partInfo := partsInfo[part.PartNumber]
if partInfo == nil || part.ETag != partInfo.ETag { if partInfo == nil || (part.ETag != partInfo.ETag && part.ETag != partInfo.MD5) {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
} }
delete(partsInfo, part.PartNumber) delete(partsInfo, part.PartNumber)
@ -376,6 +400,12 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil {
return nil, nil, err return nil, nil, err
} }
bytesHash, err := hex.DecodeString(partInfo.MD5)
if err != nil {
return nil, nil, fmt.Errorf("couldn't decode MD5 checksum of part: %w", err)
}
md5Hash.Write(bytesHash)
} }
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
@ -410,13 +440,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
} }
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt, BktInfo: p.Info.Bkt,
Object: p.Info.Key, Object: p.Info.Key,
Reader: bytes.NewReader(partsData), Reader: bytes.NewReader(partsData),
Header: initMetadata, Header: initMetadata,
Size: multipartObjetSize, Size: multipartObjetSize,
Encryption: p.Info.Encryption, Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers, CopiesNumbers: multipartInfo.CopiesNumbers,
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
}) })
if err != nil { if err != nil {
n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject, n.reqLogger(ctx).Error(logs.CouldNotPutCompletedObject,

View file

@ -34,7 +34,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
CopiesNumber: p.CopiesNumbers, CopiesNumber: p.CopiesNumbers,
} }
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo) _, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil { if err != nil {
return err return err
} }

View file

@ -1,8 +1,11 @@
package layer package layer
import ( import (
"bytes"
"context" "context"
"crypto/md5"
"crypto/sha256" "crypto/sha256"
"encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
@ -287,10 +290,23 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v}) prm.Attributes = append(prm.Attributes, [2]string{k, v})
} }
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo) size, id, hash, md5Hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(p.ContentMD5) > 0 {
headerMd5Hash, err := base64.StdEncoding.DecodeString(p.ContentMD5)
if err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
}
if !bytes.Equal(headerMd5Hash, md5Hash) {
dkirillov marked this conversation as resolved Outdated

Actually we can compare raw bytes (to avoid extra conversion)

Actually we can compare raw bytes (to avoid extra conversion)
err = n.objectDelete(ctx, p.BktInfo, id)
if err != nil {
n.reqLogger(ctx).Debug(logs.FailedToDeleteObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
}
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidDigest)
}
}
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id)) n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
@ -304,6 +320,11 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
IsUnversioned: !bktSettings.VersioningEnabled(), IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "", IsCombined: p.Header[MultipartObjectSize] != "",
} }
if len(p.CompleteMD5Hash) > 0 {
newVersion.MD5 = p.CompleteMD5Hash
dkirillov marked this conversation as resolved Outdated

The ok variable was defined quite a long time ago. So it's better to use a more meaningful name

The `ok` variable was defined quite a long time ago. So it's better to use a more meaningful name
} else {
newVersion.MD5 = hex.EncodeToString(md5Hash)
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
dkirillov marked this conversation as resolved Outdated

Why do we store md5 in different formats in this field (hex (newVersion.MD5 = hex.EncodeToString(md5Hash)) vs base64 (newVersion.MD5 = md5EncodedHash) ?

Why do we store md5 in different formats in this field (hex (`newVersion.MD5 = hex.EncodeToString(md5Hash)`) vs base64 (`newVersion.MD5 = md5EncodedHash`) ?

Format of header depends on IsCompleteMultipart, I'll change this to be more logical

Format of header depends on `IsCompleteMultipart`, I'll change this to be more logical
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
@ -340,6 +361,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Headers: p.Header, Headers: p.Header,
ContentType: p.Header[api.ContentType], ContentType: p.Header[api.ContentType],
HashSum: newVersion.ETag, HashSum: newVersion.ETag,
MD5Sum: newVersion.MD5,
} }
extendedObjInfo := &data.ExtendedObjectInfo{ extendedObjInfo := &data.ExtendedObjectInfo{
@ -378,6 +400,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
return nil, err return nil, err
} }
objInfo := objectInfoFromMeta(bkt, meta) objInfo := objectInfoFromMeta(bkt, meta)
objInfo.MD5Sum = node.MD5
extObjInfo := &data.ExtendedObjectInfo{ extObjInfo := &data.ExtendedObjectInfo{
ObjectInfo: objInfo, ObjectInfo: objInfo,
@ -430,6 +453,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return nil, err return nil, err
} }
objInfo := objectInfoFromMeta(bkt, meta) objInfo := objectInfoFromMeta(bkt, meta)
objInfo.MD5Sum = foundVersion.MD5
extObjInfo := &data.ExtendedObjectInfo{ extObjInfo := &data.ExtendedObjectInfo{
ObjectInfo: objInfo, ObjectInfo: objInfo,
@ -457,16 +481,18 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject. // objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash. // Returns object ID and payload sha256 hash.
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) { func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, []byte, error) {
dkirillov marked this conversation as resolved Outdated

Can be simplified

objInfo.UseMD5 = n.features.MD5Enable()
Can be simplified ``` objInfo.UseMD5 = n.features.MD5Enable() ```
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
prm.ClientCut = n.features.ClientCut() prm.ClientCut = n.features.ClientCut()
prm.BufferMaxSize = n.features.BufferMaxSizeForPut() prm.BufferMaxSize = n.features.BufferMaxSizeForPut()
prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled prm.WithoutHomomorphicHash = bktInfo.HomomorphicHashDisabled
var size uint64 var size uint64
hash := sha256.New() hash := sha256.New()
md5Hash := md5.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
size += uint64(len(buf)) size += uint64(len(buf))
hash.Write(buf) hash.Write(buf)
md5Hash.Write(buf)
}) })
id, err := n.frostFS.CreateObject(ctx, prm) id, err := n.frostFS.CreateObject(ctx, prm)
if err != nil { if err != nil {

As I understand we don't have to compute md5 at all if this feature is disabled

As I understand we don't have to compute md5 at all if this feature is disabled

It seems the fix for this comment somehow be missed in master
@mbiryukova @alexvanin

It seems the fix for this comment somehow be missed in master @mbiryukova @alexvanin

For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything.

For v0.29.0 we do compute both hashes, unless it produce significant slowdown. As for future releases, we may drop SHA and use MD5 as new default for everything.
@ -474,9 +500,9 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard)) n.reqLogger(ctx).Warn(logs.FailedToDiscardPutPayloadProbablyGoroutineLeaks, zap.Error(errDiscard))
} }
return 0, oid.ID{}, nil, err return 0, oid.ID{}, nil, nil, err
} }
return size, id, hash.Sum(nil), nil return size, id, hash.Sum(nil), md5Hash.Sum(nil), nil
} }
// ListObjectsV1 returns objects in a bucket for requests of Version 1. // ListObjectsV1 returns objects in a bucket for requests of Version 1.
@ -807,6 +833,7 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
} }
oi = objectInfoFromMeta(bktInfo, meta) oi = objectInfoFromMeta(bktInfo, meta)
oi.MD5Sum = node.MD5
n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node}) n.cache.PutObject(owner, &data.ExtendedObjectInfo{ObjectInfo: oi, NodeVersion: node})
return oi return oi

View file

@ -125,7 +125,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return oid.ID{}, err return oid.ID{}, err
} }
_, id, _, err := n.objectPutAndHash(ctx, prm, bktInfo) _, id, _, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err return id, err
} }

View file

@ -86,6 +86,7 @@ type (
bypassContentEncodingInChunks bool bypassContentEncodingInChunks bool
clientCut bool clientCut bool
maxBufferSizeForPut uint64 maxBufferSizeForPut uint64
md5Enabled bool
} }
maxClientsConfig struct { maxClientsConfig struct {
@ -191,6 +192,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings.setClientCut(v.GetBool(cfgClientCut)) settings.setClientCut(v.GetBool(cfgClientCut))
settings.initPlacementPolicy(log.logger, v) settings.initPlacementPolicy(log.logger, v)
settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut)) settings.setBufferMaxSizeForPut(v.GetUint64(cfgBufferMaxSizeForPut))
settings.setMD5Enabled(v.GetBool(cfgMD5Enabled))
return settings return settings
} }
@ -312,6 +314,18 @@ func (s *appSettings) CompleteMultipartKeepalive() time.Duration {
return s.completeMultipartKeepalive return s.completeMultipartKeepalive
} }
func (s *appSettings) MD5Enabled() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.md5Enabled
}
func (s *appSettings) setMD5Enabled(md5Enabled bool) {
s.mu.Lock()
s.md5Enabled = md5Enabled
s.mu.Unlock()
}
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx) a.initLayer(ctx)
a.initHandler() a.initHandler()
@ -604,6 +618,7 @@ func (a *App) updateSettings() {
a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks)) a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks))
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut)) a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut)) a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
a.settings.setMD5Enabled(a.cfg.GetBool(cfgMD5Enabled))
} }
func (a *App) startServices() { func (a *App) startServices() {

View file

@ -162,6 +162,9 @@ const ( // Settings.
// Runtime. // Runtime.
cfgSoftMemoryLimit = "runtime.soft_memory_limit" cfgSoftMemoryLimit = "runtime.soft_memory_limit"
// Enable return MD5 checksum in ETag.
cfgMD5Enabled = "features.md5.enabled"
// envPrefix is an environment variables prefix used for configuration. // envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW" envPrefix = "S3_GW"
) )

View file

@ -150,3 +150,5 @@ S3_GW_TRACING_ENDPOINT="localhost:4318"
S3_GW_TRACING_EXPORTER="otlp_grpc" S3_GW_TRACING_EXPORTER="otlp_grpc"
S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824 S3_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
S3_GW_FEATURES_MD5_ENABLED=false

View file

@ -176,3 +176,7 @@ kludge:
runtime: runtime:
soft_memory_limit: 1gb soft_memory_limit: 1gb
dkirillov marked this conversation as resolved Outdated

Maybe we can introduce new section features or hashes instead of just md5?

Also we should add description of this param to docs/configuration.md and to config/config.env.

Please run pre-commit run --all to see and fix some linter errors (e.g. no empty line at the end of the file)

Maybe we can introduce new section `features` or `hashes` instead of just `md5`? Also we should add description of this param to `docs/configuration.md` and to `config/config.env`. Please run `pre-commit run --all` to see and fix some linter errors (e.g. no empty line at the end of the file)

And put md5 in new section?

And put `md5` in new section?
features:
md5:
enabled: false

View file

@ -186,6 +186,7 @@ There are some custom types used for brevity:
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) | | `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `kludge` | [Different kludge configuration](#kludge-section) | | `kludge` | [Different kludge configuration](#kludge-section) |
| `runtime` | [Runtime configuration](#runtime-section) | | `runtime` | [Runtime configuration](#runtime-section) |
| `features` | [Features configuration](#features-section) |
### General section ### General section
@ -562,3 +563,16 @@ runtime:
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. | | `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
# `features` section
Contains parameters for enabling features.
```yaml
features:
md5:
enabled: false
```
| Parameter | Type | SIGHUP reload | Default value | Description |
dkirillov marked this conversation as resolved Outdated

Let's name this enabled (not enable) to be more consistent with other config values (e.g. pprof.enabled, nats.enabled, etc)

Let's name this `enabled` (not `enable`) to be more consistent with other config values (e.g. `pprof.enabled`, `nats.enabled`, etc)
|---------------|--------|---------------|---------------|----------------------------------------------------------------|
| `md5.enabled` | `bool` | yes | false | Flag to enable return MD5 checksum in ETag headers and fields. |

View file

@ -75,6 +75,7 @@ const (
ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go ResolveBucket = "resolve bucket" // Info in ../../api/layer/layer.go
CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go CouldntDeleteCorsObject = "couldn't delete cors object" // Error in ../../api/layer/cors.go
PutObject = "put object" // Debug in ../../api/layer/object.go PutObject = "put object" // Debug in ../../api/layer/object.go
FailedToDeleteObject = "failed to delete object" // Debug in ../../api/layer/object.go
FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go FailedToDiscardPutPayloadProbablyGoroutineLeaks = "failed to discard put payload, probably goroutine leaks" // Warn in ../../api/layer/object.go
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go

View file

@ -81,6 +81,7 @@ const (
partNumberKV = "Number" partNumberKV = "Number"
sizeKV = "Size" sizeKV = "Size"
etagKV = "ETag" etagKV = "ETag"
md5KV = "MD5"
// keys for lock. // keys for lock.
isLockKV = "IsLock" isLockKV = "IsLock"
@ -185,6 +186,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV) _, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
_, isCombined := treeNode.Get(isCombinedKV) _, isCombined := treeNode.Get(isCombinedKV)
eTag, _ := treeNode.Get(etagKV) eTag, _ := treeNode.Get(etagKV)
md5, _ := treeNode.Get(md5KV)
version := &data.NodeVersion{ version := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
@ -193,6 +195,7 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
OID: treeNode.ObjID, OID: treeNode.ObjID,
Timestamp: treeNode.TimeStamp, Timestamp: treeNode.TimeStamp,
ETag: eTag, ETag: eTag,
MD5: md5,
Size: treeNode.Size, Size: treeNode.Size,
FilePath: filePath, FilePath: filePath,
}, },
@ -302,6 +305,8 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
return nil, fmt.Errorf("invalid created timestamp: %w", err) return nil, fmt.Errorf("invalid created timestamp: %w", err)
} }
partInfo.Created = time.UnixMilli(utcMilli) partInfo.Created = time.UnixMilli(utcMilli)
case md5KV:
partInfo.MD5 = value
} }
} }
@ -578,7 +583,7 @@ func (c *Tree) GetVersions(ctx context.Context, bktInfo *data.BucketInfo, filepa
} }
func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) { func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) {
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV} meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(objectName) path := pathFromName(objectName)
p := &GetNodesParams{ p := &GetNodesParams{
@ -1024,6 +1029,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
sizeKV: strconv.FormatUint(info.Size, 10), sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag, etagKV: info.ETag,
md5KV: info.MD5,
} }
for _, part := range parts { for _, part := range parts {
@ -1156,6 +1162,9 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
if len(version.ETag) > 0 { if len(version.ETag) > 0 {
meta[etagKV] = version.ETag meta[etagKV] = version.ETag
} }
if len(version.MD5) > 0 {
meta[md5KV] = version.MD5
}
if version.IsDeleteMarker() { if version.IsDeleteMarker() {
meta[isDeleteMarkerKV] = "true" meta[isDeleteMarkerKV] = "true"
@ -1200,7 +1209,7 @@ func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.Bucke
} }
func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) { func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID, filepath string, onlyUnversioned bool) ([]*data.NodeVersion, error) {
keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV} keysToReturn := []string{oidKV, isUnversionedKV, isDeleteMarkerKV, etagKV, sizeKV, md5KV}
path := pathFromName(filepath) path := pathFromName(filepath)
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,