[#275] Change logic abort multipart upload #275

Merged
alexvanin merged 1 commit from :feature/change_logic_abort_multipart into master 2023-12-27 11:08:59 +00:00
6 changed files with 33 additions and 8 deletions

View file

@ -27,6 +27,7 @@ This document outlines major changes between releases.
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
- Return `ETag` value in quotes (#219)
- Use tombstone when delete multipart upload (#275)
### Removed
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)

View file

@ -77,6 +77,7 @@ type MultipartInfo struct {
Created time.Time
Meta map[string]string
CopiesNumbers []uint32
Finished bool
}
// PartInfo is upload information about part.

View file

@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
n.cache.DeleteObject(addr)
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
}
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
}
}
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID)
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
}
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -377,7 +377,7 @@ LOOP:
return result, nil
}
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var uploadID string
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
LOOP:
for key, multiparts := range cnrMultipartsMap {
for i, multipart := range multiparts {
if multipart.ID == multipartNodeID {
if multipart.ID == multipartInfo.ID {
uploadID = multipart.UploadID
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
break LOOP

View file

@ -64,7 +64,7 @@ type TreeService interface {
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)

View file

@ -82,6 +82,7 @@ const (
sizeKV = "Size"
etagKV = "ETag"
md5KV = "MD5"
finishedKV = "Finished"
// keys for lock.
isLockKV = "IsLock"
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
multipartInfo.Created = time.UnixMilli(utcMilli)
}
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
}
return multipartInfo, nil
}
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
}
case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
multipartInfo.Finished = isFinished
}
default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
}
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
}
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil {
if err != nil || multipartInfo.Finished {
continue
}
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
continue
}
if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil
}
}
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
return result, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID)
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
}
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
return info.Meta
}