[#275] Change logic abort multipart upload #275
6 changed files with 33 additions and 8 deletions
|
@ -27,6 +27,7 @@ This document outlines major changes between releases.
|
||||||
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
|
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
|
||||||
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
|
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
|
||||||
- Return `ETag` value in quotes (#219)
|
- Return `ETag` value in quotes (#219)
|
||||||
|
- Use tombstone when delete multipart upload (#275)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)
|
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)
|
||||||
|
|
|
@ -77,6 +77,7 @@ type MultipartInfo struct {
|
||||||
Created time.Time
|
Created time.Time
|
||||||
Meta map[string]string
|
Meta map[string]string
|
||||||
CopiesNumbers []uint32
|
CopiesNumbers []uint32
|
||||||
|
Finished bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartInfo is upload information about part.
|
// PartInfo is upload information about part.
|
||||||
|
|
|
@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
n.cache.DeleteObject(addr)
|
n.cache.DeleteObject(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
|
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||||
|
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID)
|
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||||
|
|
|
@ -377,7 +377,7 @@ LOOP:
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
|
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
|
||||||
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
||||||
|
|
||||||
var uploadID string
|
var uploadID string
|
||||||
|
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
|
||||||
LOOP:
|
LOOP:
|
||||||
for key, multiparts := range cnrMultipartsMap {
|
for key, multiparts := range cnrMultipartsMap {
|
||||||
for i, multipart := range multiparts {
|
for i, multipart := range multiparts {
|
||||||
if multipart.ID == multipartNodeID {
|
if multipart.ID == multipartInfo.ID {
|
||||||
uploadID = multipart.UploadID
|
uploadID = multipart.UploadID
|
||||||
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
|
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
|
||||||
break LOOP
|
break LOOP
|
||||||
|
|
|
@ -64,7 +64,7 @@ type TreeService interface {
|
||||||
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
|
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
|
||||||
|
|
||||||
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||||
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error
|
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||||
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
|
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
|
||||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,7 @@ const (
|
||||||
sizeKV = "Size"
|
sizeKV = "Size"
|
||||||
etagKV = "ETag"
|
etagKV = "ETag"
|
||||||
md5KV = "MD5"
|
md5KV = "MD5"
|
||||||
|
finishedKV = "Finished"
|
||||||
|
|
||||||
// keys for lock.
|
// keys for lock.
|
||||||
isLockKV = "IsLock"
|
isLockKV = "IsLock"
|
||||||
|
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
|
||||||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
finished, _ := treeNode.Get(finishedKV)
|
||||||
|
if flag, err := strconv.ParseBool(finished); err == nil {
|
||||||
|
multipartInfo.Finished = flag
|
||||||
|
}
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
|
|
||||||
return multipartInfo, nil
|
return multipartInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
||||||
}
|
}
|
||||||
case ownerKV:
|
case ownerKV:
|
||||||
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
|
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
|
||||||
|
case finishedKV:
|
||||||
|
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
|
||||||
|
multipartInfo.Finished = isFinished
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||||
}
|
}
|
||||||
|
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
|
||||||
}
|
}
|
||||||
|
|
||||||
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
|
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
|
||||||
if err != nil {
|
if err != nil || multipartInfo.Finished {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if info.UploadID == uploadID {
|
if info.UploadID == uploadID {
|
||||||
|
if info.Finished {
|
||||||
|
break
|
||||||
|
}
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
|
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
|
||||||
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID)
|
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
multipartInfo.Finished = true
|
||||||
|
|
||||||
|
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
|
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
|
||||||
|
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
|
||||||
info.Meta[uploadIDKV] = info.UploadID
|
info.Meta[uploadIDKV] = info.UploadID
|
||||||
info.Meta[ownerKV] = info.Owner.EncodeToString()
|
info.Meta[ownerKV] = info.Owner.EncodeToString()
|
||||||
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
|
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
|
||||||
|
if info.Finished {
|
||||||
|
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
|
||||||
|
}
|
||||||
|
|
||||||
return info.Meta
|
return info.Meta
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
What happens when this code is running during multipart upload started by previous version of the gateway?
Previous version won't set
finishedKV
flag, soParseBool
returns error andmultipartInfo.Finished
staysfalse
. Shouldn't it betrue
in such cases?As we found out, this code isn't triggered when accessing multipart upload finished by previous version of the gateway, so it works as expected.