[#275] Change logic delete multipart upload

In order not to accidentally take outdated
information about downloaded parts from other
nodes, now when the multipart is abort or complete,
the root node of the multipart upload with the
finish flag remains in the tree.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
This commit is contained in:
Roman Loginov 2023-12-04 09:42:25 +03:00
parent 08019f1574
commit 6f9ee3da76
6 changed files with 33 additions and 8 deletions

View file

@ -27,6 +27,7 @@ This document outlines major changes between releases.
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
- Return `ETag` value in quotes (#219)
- Use tombstone when delete multipart upload (#275)
### Removed
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)

View file

@ -77,6 +77,7 @@ type MultipartInfo struct {
Created time.Time
Meta map[string]string
CopiesNumbers []uint32
Finished bool
}
// PartInfo is upload information about part.

View file

@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
n.cache.DeleteObject(addr)
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
}
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
}
}
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID)
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
}
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -377,7 +377,7 @@ LOOP:
return result, nil
}
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var uploadID string
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
LOOP:
for key, multiparts := range cnrMultipartsMap {
for i, multipart := range multiparts {
if multipart.ID == multipartNodeID {
if multipart.ID == multipartInfo.ID {
uploadID = multipart.UploadID
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
break LOOP

View file

@ -64,7 +64,7 @@ type TreeService interface {
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)

View file

@ -82,6 +82,7 @@ const (
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
// keys for lock.
isLockKV = "IsLock"
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
multipartInfo.Created = time.UnixMilli(utcMilli)
}
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
}
return multipartInfo, nil
}
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
}
case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
multipartInfo.Finished = isFinished
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
}
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil {
if err != nil || multipartInfo.Finished {
continue
}
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
continue
}
if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil
}
}
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
return result, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID)
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
return info.Meta
}