[#275] Change logic abort multipart upload #275
6 changed files with 33 additions and 8 deletions
|
@ -27,6 +27,7 @@ This document outlines major changes between releases.
|
|||
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
|
||||
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
|
||||
- Return `ETag` value in quotes (#219)
|
||||
- Use tombstone when delete multipart upload (#275)
|
||||
|
||||
### Removed
|
||||
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)
|
||||
|
|
|
@ -77,6 +77,7 @@ type MultipartInfo struct {
|
|||
Created time.Time
|
||||
Meta map[string]string
|
||||
CopiesNumbers []uint32
|
||||
Finished bool
|
||||
}
|
||||
|
||||
// PartInfo is upload information about part.
|
||||
|
|
|
@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
n.cache.DeleteObject(addr)
|
||||
}
|
||||
|
||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
|
||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||
}
|
||||
|
||||
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||
|
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
|||
}
|
||||
}
|
||||
|
||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID)
|
||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||
}
|
||||
|
||||
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
|
|
|
@ -377,7 +377,7 @@ LOOP:
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
|
||||
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
|
||||
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
||||
|
||||
var uploadID string
|
||||
|
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
|
|||
LOOP:
|
||||
for key, multiparts := range cnrMultipartsMap {
|
||||
for i, multipart := range multiparts {
|
||||
if multipart.ID == multipartNodeID {
|
||||
if multipart.ID == multipartInfo.ID {
|
||||
uploadID = multipart.UploadID
|
||||
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
|
||||
break LOOP
|
||||
|
|
|
@ -64,7 +64,7 @@ type TreeService interface {
|
|||
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
|
||||
|
||||
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error
|
||||
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
|
||||
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
|
||||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||
|
||||
|
|
|
@ -82,6 +82,7 @@ const (
|
|||
sizeKV = "Size"
|
||||
etagKV = "ETag"
|
||||
md5KV = "MD5"
|
||||
finishedKV = "Finished"
|
||||
|
||||
// keys for lock.
|
||||
isLockKV = "IsLock"
|
||||
|
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
|
|||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||
}
|
||||
|
||||
finished, _ := treeNode.Get(finishedKV)
|
||||
if flag, err := strconv.ParseBool(finished); err == nil {
|
||||
multipartInfo.Finished = flag
|
||||
}
|
||||
|
||||
return multipartInfo, nil
|
||||
}
|
||||
|
||||
|
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
|||
}
|
||||
case ownerKV:
|
||||
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
|
||||
case finishedKV:
|
||||
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
|
||||
multipartInfo.Finished = isFinished
|
||||
}
|
||||
default:
|
||||
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||
}
|
||||
|
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
|
|||
}
|
||||
|
||||
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
|
||||
if err != nil {
|
||||
if err != nil || multipartInfo.Finished {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
continue
|
||||
}
|
||||
if info.UploadID == uploadID {
|
||||
if info.Finished {
|
||||
break
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
}
|
||||
|
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
|
||||
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID)
|
||||
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
|
||||
err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
multipartInfo.Finished = true
|
||||
|
||||
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
||||
}
|
||||
|
||||
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
|
||||
|
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
|
|||
info.Meta[uploadIDKV] = info.UploadID
|
||||
info.Meta[ownerKV] = info.Owner.EncodeToString()
|
||||
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
|
||||
if info.Finished {
|
||||
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
|
||||
}
|
||||
|
||||
return info.Meta
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue