[#125] Handle negative Content-Length on put

Add computing actual object size during calculating hash on put.
Use this actual value to save in tree and cache

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-06-01 16:45:28 +03:00
parent 089e2c8bc8
commit 843cd4c094
21 changed files with 105 additions and 70 deletions

View file

@ -40,7 +40,7 @@ type (
Bucket string
Name string
Size int64
Size uint64
ContentType string
Created time.Time
HashSum string
@ -52,7 +52,7 @@ type (
NotificationInfo struct {
Name string
Version string
Size int64
Size uint64
HashSum string
}

View file

@ -53,7 +53,7 @@ type BaseNodeVersion struct {
ParenID uint64
OID oid.ID
Timestamp uint64
Size int64
Size uint64
ETag string
FilePath string
}
@ -83,14 +83,14 @@ type PartInfo struct {
UploadID string
Number int
OID oid.ID
Size int64
Size uint64
ETag string
Created time.Time
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
func (p *PartInfo) ToHeaderString() string {
return strconv.Itoa(p.Number) + "-" + strconv.FormatInt(p.Size, 10) + "-" + p.ETag
return strconv.Itoa(p.Number) + "-" + strconv.FormatUint(p.Size, 10) + "-" + p.ETag
}
// LockInfo is lock information to create appropriate tree node.

View file

@ -17,7 +17,7 @@ type (
GetObjectAttributesResponse struct {
ETag string `xml:"ETag,omitempty"`
Checksum *Checksum `xml:"Checksum,omitempty"`
ObjectSize int64 `xml:"ObjectSize,omitempty"`
ObjectSize uint64 `xml:"ObjectSize,omitempty"`
StorageClass string `xml:"StorageClass,omitempty"`
ObjectParts *ObjectParts `xml:"ObjectParts,omitempty"`
}

View file

@ -88,7 +88,7 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.ContentLength, info.Headers[layer.AttributeDecryptedSize])
addSSECHeaders(h, requestHeader)
} else {
h.Set(api.ContentLength, strconv.FormatInt(info.Size, 10))
h.Set(api.ContentLength, strconv.FormatUint(info.Size, 10))
}
h.Set(api.ETag, info.HashSum)
@ -163,13 +163,13 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
fullSize := info.Size
if encryptionParams.Enabled() {
if fullSize, err = strconv.ParseInt(info.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
if fullSize, err = strconv.ParseUint(info.Headers[layer.AttributeDecryptedSize], 10, 64); err != nil {
h.logAndSendError(w, "invalid decrypted size header", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
}
if params, err = fetchRangeHeader(r.Header, uint64(fullSize)); err != nil {
if params, err = fetchRangeHeader(r.Header, fullSize); err != nil {
h.logAndSendError(w, "could not parse range header", reqInfo, err)
return
}
@ -268,7 +268,7 @@ func parseHTTPTime(data string) (*time.Time, error) {
return &result, nil
}
func writeRangeHeaders(w http.ResponseWriter, params *layer.RangeParams, size int64) {
func writeRangeHeaders(w http.ResponseWriter, params *layer.RangeParams, size uint64) {
w.Header().Set(api.AcceptRanges, "bytes")
w.Header().Set(api.ContentRange, fmt.Sprintf("bytes %d-%d/%d", params.Start, params.End, size))
w.Header().Set(api.ContentLength, strconv.FormatUint(params.End-params.Start+1, 10))

View file

@ -190,7 +190,7 @@ func createTestObject(hc *handlerContext, bktInfo *data.BucketInfo, objName stri
extObjInfo, err := hc.Layer().PutObject(hc.Context(), &layer.PutObjectParams{
BktInfo: bktInfo,
Object: objName,
Size: int64(len(content)),
Size: uint64(len(content)),
Reader: bytes.NewReader(content),
Header: header,
})

View file

@ -13,8 +13,8 @@ import (
const sizeToDetectType = 512
func getRangeToDetectContentType(maxSize int64) *layer.RangeParams {
end := uint64(maxSize)
func getRangeToDetectContentType(maxSize uint64) *layer.RangeParams {
end := maxSize
if sizeToDetectType < end {
end = sizeToDetectType
}

View file

@ -216,6 +216,11 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
return
}
var size uint64
if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
p := &layer.UploadPartParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID,
@ -223,7 +228,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
Key: reqInfo.ObjectName,
},
PartNumber: partNumber,
Size: r.ContentLength,
Size: size,
Reader: r.Body,
}

View file

@ -43,13 +43,13 @@ func (p *postPolicy) condition(key string) *policyCondition {
return nil
}
func (p *postPolicy) CheckContentLength(size int64) bool {
func (p *postPolicy) CheckContentLength(size uint64) bool {
if p.empty {
return true
}
for _, condition := range p.Conditions {
if condition.Matching == "content-length-range" {
length := strconv.FormatInt(size, 10)
length := strconv.FormatUint(size, 10)
return condition.Key <= length && length <= condition.Value
}
}
@ -218,11 +218,16 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
var size uint64
if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
params := &layer.PutObjectParams{
BktInfo: bktInfo,
Object: reqInfo.ObjectName,
Reader: r.Body,
Size: r.ContentLength,
Size: size,
Header: metadata,
Encryption: encryptionParams,
}
@ -388,10 +393,10 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
}
var contentReader io.Reader
var size int64
var size uint64
if content, ok := r.MultipartForm.Value["file"]; ok {
contentReader = bytes.NewBufferString(content[0])
size = int64(len(content[0]))
size = uint64(len(content[0]))
} else {
file, head, err := r.FormFile("file")
if err != nil {
@ -399,7 +404,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
return
}
contentReader = file
size = head.Size
size = uint64(head.Size)
reqInfo.ObjectName = strings.ReplaceAll(reqInfo.ObjectName, "${filename}", head.Filename)
}
if !policy.CheckContentLength(size) {

View file

@ -1,9 +1,11 @@
package handler
import (
"bytes"
"encoding/json"
"mime/multipart"
"net/http"
"strconv"
"strings"
"testing"
"time"
@ -126,3 +128,21 @@ func TestPutObjectOverrideCopiesNumber(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "1", objInfo.Headers[layer.AttributeFrostfsCopiesNumber])
}
func TestPutObjectWithNegativeContentLength(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-put", "object-for-put"
createTestBucket(tc, bktName)
content := []byte("content")
w, r := prepareTestPayloadRequest(tc, bktName, objName, bytes.NewReader(content))
r.ContentLength = -1
tc.Handler().PutObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(tc, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
require.Equal(t, strconv.Itoa(len(content)), w.Header().Get(api.ContentLength))
}

View file

@ -104,7 +104,7 @@ type Object struct {
Key string
LastModified string // time string of format "2006-01-02T15:04:05.000Z"
ETag string `xml:"ETag,omitempty"`
Size int64
Size uint64
// Owner of the object.
Owner *Owner `xml:"Owner,omitempty"`
@ -120,7 +120,7 @@ type ObjectVersionResponse struct {
Key string `xml:"Key"`
LastModified string `xml:"LastModified"`
Owner Owner `xml:"Owner"`
Size int64 `xml:"Size"`
Size uint64 `xml:"Size"`
StorageClass string `xml:"StorageClass,omitempty"` // is empty!!
VersionID string `xml:"VersionId"`
}

View file

@ -45,7 +45,7 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
CopiesNumber: p.CopiesNumbers,
}
objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return fmt.Errorf("put system object: %w", err)
}

View file

@ -102,7 +102,7 @@ type (
PutObjectParams struct {
BktInfo *data.BucketInfo
Object string
Size int64
Size uint64
Reader io.Reader
Header map[string]string
Lock *data.ObjectLock
@ -135,7 +135,7 @@ type (
ScrBktInfo *data.BucketInfo
DstBktInfo *data.BucketInfo
DstObject string
SrcSize int64
SrcSize uint64
Header map[string]string
Range *RangeParams
Lock *data.ObjectLock

View file

@ -60,7 +60,7 @@ type (
UploadPartParams struct {
Info *UploadInfoParams
PartNumber int
Size int64
Size uint64
Reader io.Reader
}
@ -91,7 +91,7 @@ type (
ETag string
LastModified string
PartNumber int
Size int64
Size uint64
}
ListMultipartUploadsParams struct {
@ -212,22 +212,25 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
decSize := p.Size
if p.Info.Encryption.Enabled() {
r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key())
r, encSize, err := encryptionReader(p.Reader, p.Size, p.Info.Encryption.Key())
if err != nil {
return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err)
}
prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)})
prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatUint(p.Size, 10)})
prm.Payload = r
p.Size = int64(encSize)
p.Size = encSize
}
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
size, id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}
if p.Info.Encryption.Enabled() {
size = decSize
}
reqInfo := api.GetReqInfo(ctx)
n.log.Debug("upload part",
@ -241,7 +244,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
UploadID: p.Info.UploadID,
Number: p.PartNumber,
OID: id,
Size: decSize,
Size: size,
ETag: hex.EncodeToString(hash),
Created: prm.CreationTime,
}
@ -285,8 +288,8 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
size := p.SrcObjInfo.Size
if p.Range != nil {
size = int64(p.Range.End - p.Range.Start + 1)
if p.Range.End > uint64(p.SrcObjInfo.Size) {
size = p.Range.End - p.Range.Start + 1
if p.Range.End > p.SrcObjInfo.Size {
return nil, errors.GetAPIError(errors.ErrInvalidCopyPartRangeSource)
}
}
@ -375,7 +378,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
return nil, nil, errors.GetAPIError(errors.ErrInvalidPart)
}
var multipartObjetSize int64
var multipartObjetSize uint64
var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts))
@ -393,7 +396,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted)
if encInfo.Enabled {
encPartSize, err := sio.EncryptedSize(uint64(partInfo.Size))
encPartSize, err := sio.EncryptedSize(partInfo.Size)
if err != nil {
return nil, nil, fmt.Errorf("compute encrypted size: %w", err)
}
@ -430,8 +433,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
initMetadata[AttributeEncryptionAlgorithm] = encInfo.Algorithm
initMetadata[AttributeHMACKey] = encInfo.HMACKey
initMetadata[AttributeHMACSalt] = encInfo.HMACSalt
initMetadata[AttributeDecryptedSize] = strconv.FormatInt(multipartObjetSize, 10)
multipartObjetSize = int64(encMultipartObjectSize)
initMetadata[AttributeDecryptedSize] = strconv.FormatUint(multipartObjetSize, 10)
multipartObjetSize = encMultipartObjectSize
}
r := &multiObjectReader{

View file

@ -34,7 +34,7 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
CopiesNumber: p.CopiesNumbers,
}
objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
_, objID, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return err
}

View file

@ -170,7 +170,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
if err != nil {
return nil, fmt.Errorf("invalid completed part number '%s': %w", partInfo[0], err)
}
size, err := strconv.Atoi(partInfo[1])
size, err := strconv.ParseUint(partInfo[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid completed part size '%s': %w", partInfo[1], err)
}
@ -178,7 +178,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
return &Part{
ETag: partInfo[2],
PartNumber: num,
Size: int64(size),
Size: size,
}, nil
}
@ -191,26 +191,18 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
return nil, fmt.Errorf("couldn't get versioning settings object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
FilePath: p.Object,
Size: p.Size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
}
r := p.Reader
if p.Encryption.Enabled() {
p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10)
p.Header[AttributeDecryptedSize] = strconv.FormatUint(p.Size, 10)
if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil {
return nil, fmt.Errorf("add encryption header: %w", err)
}
var encSize uint64
if r, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil {
if r, encSize, err = encryptionReader(p.Reader, p.Size, p.Encryption.Key()); err != nil {
return nil, fmt.Errorf("create encrypter: %w", err)
}
p.Size = int64(encSize)
p.Size = encSize
}
if r != nil {
@ -230,7 +222,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Creator: owner,
PayloadSize: uint64(p.Size),
PayloadSize: p.Size,
Filepath: p.Object,
Payload: r,
CreationTime: TimeNow(ctx),
@ -243,7 +235,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, err
}
@ -254,8 +246,16 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
zap.String("bucket", p.BktInfo.Name), zap.Stringer("cid", p.BktInfo.CID),
zap.String("object", p.Object), zap.Stringer("oid", id))
newVersion.OID = id
newVersion.ETag = hex.EncodeToString(hash)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
@ -286,7 +286,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Owner: owner,
Bucket: p.BktInfo.Name,
Name: p.Object,
Size: p.Size,
Size: size,
Created: prm.CreationTime,
Headers: p.Header,
ContentType: p.Header[api.ContentType],
@ -405,17 +405,19 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash.
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (oid.ID, []byte, error) {
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
var size uint64
hash := sha256.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
size += uint64(len(buf))
hash.Write(buf)
})
id, err := n.frostFS.CreateObject(ctx, prm)
if err != nil {
return oid.ID{}, nil, err
return 0, oid.ID{}, nil, err
}
return id, hash.Sum(nil), nil
return size, id, hash.Sum(nil), nil
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.

View file

@ -126,7 +126,7 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return oid.ID{}, err
}
id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
_, id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
return id, err
}

View file

@ -94,7 +94,7 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
ContentType: mimeType,
Headers: headers,
Owner: *meta.OwnerID(),
Size: int64(meta.PayloadSize()),
Size: meta.PayloadSize(),
HashSum: hex.EncodeToString(payloadChecksum.Value()),
}
}

View file

@ -17,7 +17,7 @@ import (
var (
defaultTestCreated = time.Now()
defaultTestPayload = []byte("test object payload")
defaultTestPayloadLength = int64(len(defaultTestPayload))
defaultTestPayloadLength = uint64(len(defaultTestPayload))
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)

View file

@ -21,7 +21,7 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
BktInfo: tc.bktInfo,
Object: tc.obj,
Size: int64(len(content)),
Size: uint64(len(content)),
Reader: bytes.NewReader(content),
Header: make(map[string]string),
})

View file

@ -94,7 +94,7 @@ type (
Object struct {
Key string `json:"key"`
Size int64 `json:"size,omitempty"`
Size uint64 `json:"size,omitempty"`
VersionID string `json:"versionId,omitempty"`
ETag string `json:"eTag,omitempty"`
Sequencer string `json:"sequencer,omitempty"`

View file

@ -38,7 +38,7 @@ type (
ParentID uint64
ObjID oid.ID
TimeStamp uint64
Size int64
Size uint64
Meta map[string]string
}
@ -143,7 +143,7 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
case sizeKV:
if sizeStr := string(kv.GetValue()); len(sizeStr) > 0 {
var err error
if treeNode.Size, err = strconv.ParseInt(sizeStr, 10, 64); err != nil {
if treeNode.Size, err = strconv.ParseUint(sizeStr, 10, 64); err != nil {
return nil, fmt.Errorf("invalid size value '%s': %w", sizeStr, err)
}
}
@ -261,7 +261,7 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
case etagKV:
partInfo.ETag = value
case sizeKV:
if partInfo.Size, err = strconv.ParseInt(value, 10, 64); err != nil {
if partInfo.Size, err = strconv.ParseUint(value, 10, 64); err != nil {
return nil, fmt.Errorf("invalid part size: %w", err)
}
case createdKV:
@ -921,7 +921,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
meta := map[string]string{
partNumberKV: strconv.Itoa(info.Number),
oidKV: info.OID.EncodeToString(),
sizeKV: strconv.FormatInt(info.Size, 10),
sizeKV: strconv.FormatUint(info.Size, 10),
createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10),
etagKV: info.ETag,
}
@ -1057,7 +1057,7 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
}
if version.Size > 0 {
meta[sizeKV] = strconv.FormatInt(version.Size, 10)
meta[sizeKV] = strconv.FormatUint(version.Size, 10)
}
if len(version.ETag) > 0 {
meta[etagKV] = version.ETag