[#125] Handle negative Content-Length on put

Add computing actual object size during calculating hash on put.
Use this actual value to save in tree and cache

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-06-01 16:45:28 +03:00 committed by Alexey Vanin
parent b445f7bbf9
commit 4a6e3a19ce
21 changed files with 105 additions and 70 deletions

View file

@ -170,7 +170,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
if err != nil {
return nil, fmt.Errorf("invalid completed part number '%s': %w", partInfo[0], err)
}
size, err := strconv.Atoi(partInfo[1])
size, err := strconv.ParseUint(partInfo[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid completed part size '%s': %w", partInfo[1], err)
}
@ -178,7 +178,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
return &Part{
ETag: partInfo[2],
PartNumber: num,
Size: int64(size),
Size: size,
}, nil
}
@ -191,26 +191,18 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
return nil, fmt.Errorf("couldn't get versioning settings object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
FilePath: p.Object,
Size: p.Size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
}
r := p.Reader
if p.Encryption.Enabled() {
p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10)
p.Header[AttributeDecryptedSize] = strconv.FormatUint(p.Size, 10)
if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil {
return nil, fmt.Errorf("add encryption header: %w", err)
}
var encSize uint64
if r, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil {
if r, encSize, err = encryptionReader(p.Reader, p.Size, p.Encryption.Key()); err != nil {
return nil, fmt.Errorf("create encrypter: %w", err)
}
p.Size = int64(encSize)
p.Size = encSize
}
if r != nil {
@ -230,7 +222,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm := PrmObjectCreate{
Container: p.BktInfo.CID,
Creator: owner,
PayloadSize: uint64(p.Size),
PayloadSize: p.Size,
Filepath: p.Object,
Payload: r,
CreationTime: TimeNow(ctx),
@ -243,7 +235,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
size, id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, err
}
@ -254,8 +246,16 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
zap.String("bucket", p.BktInfo.Name), zap.Stringer("cid", p.BktInfo.CID),
zap.String("object", p.Object), zap.Stringer("oid", id))
newVersion.OID = id
newVersion.ETag = hex.EncodeToString(hash)
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
@ -286,7 +286,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Owner: owner,
Bucket: p.BktInfo.Name,
Name: p.Object,
Size: p.Size,
Size: size,
Created: prm.CreationTime,
Headers: p.Header,
ContentType: p.Header[api.ContentType],
@ -405,17 +405,19 @@ func (n *layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idOb
// objectPutAndHash prepare auth parameters and invoke frostfs.CreateObject.
// Returns object ID and payload sha256 hash.
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (oid.ID, []byte, error) {
func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktInfo *data.BucketInfo) (uint64, oid.ID, []byte, error) {
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
var size uint64
hash := sha256.New()
prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) {
size += uint64(len(buf))
hash.Write(buf)
})
id, err := n.frostFS.CreateObject(ctx, prm)
if err != nil {
return oid.ID{}, nil, err
return 0, oid.ID{}, nil, err
}
return id, hash.Sum(nil), nil
return size, id, hash.Sum(nil), nil
}
// ListObjectsV1 returns objects in a bucket for requests of Version 1.