frostfs-s3-gw/api/layer/patch.go

270 lines
8.2 KiB
Go
Raw Normal View History

package layer
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
)
type PatchObjectParams struct {
Object *data.ExtendedObjectInfo
BktInfo *data.BucketInfo
NewBytes io.Reader
Range *RangeParams
VersioningEnabled bool
CopiesNumbers []uint32
}
func (n *Layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "layer.PatchObject")
defer span.End()
if p.Object.ObjectInfo.Headers[AttributeDecryptedSize] != "" {
return nil, fmt.Errorf("patch encrypted object")
}
if p.Object.ObjectInfo.Headers[MultipartObjectSize] != "" {
return n.patchMultipartObject(ctx, p)
}
prmPatch := frostfs.PrmObjectPatch{
Container: p.BktInfo.CID,
Object: p.Object.ObjectInfo.ID,
Payload: p.NewBytes,
Offset: p.Range.Start,
Length: p.Range.End - p.Range.Start + 1,
ObjectSize: p.Object.ObjectInfo.Size,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
createdObj, err := n.patchObject(ctx, prmPatch)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
FilePath: p.Object.ObjectInfo.Name,
Size: createdObj.Size,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = ""
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.NodeVersion = newVersion
return p.Object, nil
}
func (n *Layer) patchObject(ctx context.Context, p frostfs.PrmObjectPatch) (*data.CreatedObjectInfo, error) {
objID, err := n.frostFS.PatchObject(ctx, p)
if err != nil {
return nil, fmt.Errorf("patch object: %w", err)
}
prmHead := frostfs.PrmObjectHead{
PrmAuth: p.PrmAuth,
Container: p.Container,
Object: objID,
}
obj, err := n.frostFS.HeadObject(ctx, prmHead)
if err != nil {
return nil, fmt.Errorf("head object: %w", err)
}
payloadChecksum, _ := obj.PayloadChecksum()
return &data.CreatedObjectInfo{
ID: objID,
Size: obj.PayloadSize(),
HashSum: payloadChecksum.Value(),
}, nil
}
func (n *Layer) patchMultipartObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
combinedObj, err := n.objectGet(ctx, p.BktInfo, p.Object.ObjectInfo.ID)
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.Object.ObjectInfo.ID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
prmPatch := frostfs.PrmObjectPatch{
Container: p.BktInfo.CID,
}
n.prepareAuthParameters(ctx, &prmPatch.PrmAuth, p.BktInfo.Owner)
off, ln := p.Range.Start, p.Range.End-p.Range.Start+1
var multipartObjectSize uint64
for i, part := range parts {
if off > part.Size || (off == part.Size && i != len(parts)-1) || ln == 0 {
multipartObjectSize += part.Size
if ln != 0 {
off -= part.Size
}
continue
}
var createdObj *data.CreatedObjectInfo
createdObj, off, ln, err = n.patchPart(ctx, part, p, &prmPatch, off, ln, i == len(parts)-1)
if err != nil {
return nil, fmt.Errorf("patch part: %w", err)
}
parts[i].OID = createdObj.ID
parts[i].Size = createdObj.Size
parts[i].MD5 = ""
parts[i].ETag = hex.EncodeToString(createdObj.HashSum)
multipartObjectSize += createdObj.Size
}
return n.updateCombinedObject(ctx, parts, multipartObjectSize, p)
}
// Returns patched part info, updated offset and length.
func (n *Layer) patchPart(ctx context.Context, part *data.PartInfo, p *PatchObjectParams, prmPatch *frostfs.PrmObjectPatch, off, ln uint64, lastPart bool) (*data.CreatedObjectInfo, uint64, uint64, error) {
if off == 0 && ln >= part.Size {
curLen := part.Size
if lastPart {
curLen = ln
}
prm := frostfs.PrmObjectCreate{
Container: p.BktInfo.CID,
Payload: io.LimitReader(p.NewBytes, int64(curLen)),
CreationTime: part.Created,
CopiesNumber: p.CopiesNumbers,
}
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, 0, 0, fmt.Errorf("put new part object '%s': %w", part.OID.EncodeToString(), err)
}
ln -= curLen
return createdObj, off, ln, err
}
curLen := ln
if off+curLen > part.Size && !lastPart {
curLen = part.Size - off
}
prmPatch.Object = part.OID
prmPatch.ObjectSize = part.Size
prmPatch.Offset = off
prmPatch.Length = curLen
prmPatch.Payload = io.LimitReader(p.NewBytes, int64(prmPatch.Length))
createdObj, err := n.patchObject(ctx, *prmPatch)
if err != nil {
return nil, 0, 0, fmt.Errorf("patch part object '%s': %w", part.OID.EncodeToString(), err)
}
ln -= curLen
off = 0
return createdObj, off, ln, nil
}
func (n *Layer) updateCombinedObject(ctx context.Context, parts []*data.PartInfo, fullObjSize uint64, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
newParts, err := json.Marshal(parts)
if err != nil {
return nil, fmt.Errorf("marshal parts for combined object: %w", err)
}
var headerParts strings.Builder
for i, part := range parts {
headerPart := part.ToHeaderString()
if i != len(parts)-1 {
headerPart += ","
}
headerParts.WriteString(headerPart)
}
prm := frostfs.PrmObjectCreate{
Container: p.BktInfo.CID,
PayloadSize: fullObjSize,
Filepath: p.Object.ObjectInfo.Name,
Payload: bytes.NewReader(newParts),
CreationTime: p.Object.ObjectInfo.Created,
CopiesNumber: p.CopiesNumbers,
}
prm.Attributes = make([][2]string, 0, len(p.Object.ObjectInfo.Headers)+1)
for k, v := range p.Object.ObjectInfo.Headers {
switch k {
case MultipartObjectSize:
prm.Attributes = append(prm.Attributes, [2]string{MultipartObjectSize, strconv.FormatUint(fullObjSize, 10)})
case UploadCompletedParts:
prm.Attributes = append(prm.Attributes, [2]string{UploadCompletedParts, headerParts.String()})
case api.ContentType:
default:
prm.Attributes = append(prm.Attributes, [2]string{k, v})
}
}
prm.Attributes = append(prm.Attributes, [2]string{api.ContentType, p.Object.ObjectInfo.ContentType})
createdObj, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("put new combined object: %w", err)
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: createdObj.ID,
ETag: hex.EncodeToString(createdObj.HashSum),
MD5: hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts)),
FilePath: p.Object.ObjectInfo.Name,
Size: fullObjSize,
Created: &p.Object.ObjectInfo.Created,
Owner: &n.gateOwner,
CreationEpoch: p.Object.NodeVersion.CreationEpoch,
},
IsUnversioned: !p.VersioningEnabled,
IsCombined: p.Object.ObjectInfo.Headers[MultipartObjectSize] != "",
}
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil {
return nil, fmt.Errorf("couldn't add new version to tree service: %w", err)
}
p.Object.ObjectInfo.ID = createdObj.ID
p.Object.ObjectInfo.Size = createdObj.Size
p.Object.ObjectInfo.MD5Sum = hex.EncodeToString(createdObj.MD5Sum) + "-" + strconv.Itoa(len(parts))
p.Object.ObjectInfo.HashSum = hex.EncodeToString(createdObj.HashSum)
p.Object.ObjectInfo.Headers[MultipartObjectSize] = strconv.FormatUint(fullObjSize, 10)
p.Object.ObjectInfo.Headers[UploadCompletedParts] = headerParts.String()
p.Object.NodeVersion = newVersion
return p.Object, nil
}