[#448] multipart: Support removing duplicated parts

Previously after tree split we can have duplicated parts
(several objects and tree node referred to the same part number).
Some of them couldn't be deleted after abort or compete action.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-18 16:40:55 +03:00 committed by Alexey Vanin
parent 9bdfe2a016
commit 056f168d77
9 changed files with 336 additions and 68 deletions

View file

@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
MD5: hex.EncodeToString(createdObj.MD5Sum),
}
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound {
return nil, err
}
if !oldPartIDNotFound {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
for _, oldPartID := range oldPartIDs {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString()))
}
}
}
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize uint64
var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts))
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder
md5Hash := md5.New()
for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
}
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
for _, prts := range partsInfo {
for _, partInfo := range prts {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
zap.Error(err))
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
}
addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr)
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err
}
for _, info := range parts {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
for _, infos := range parts {
for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
}
}
}
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo {
for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
})
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
}
}
return nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return nil, nil, err
}
res := make(map[int]*data.PartInfo, len(parts))
res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts))
for i, part := range parts {
res[part.Number] = part
res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString()
}