[#275] Change logic abort multipart upload #275

Merged
alexvanin merged 1 commit from :feature/change_logic_abort_multipart into master 2023-12-27 11:08:59 +00:00
6 changed files with 33 additions and 8 deletions

View file

@ -27,6 +27,7 @@ This document outlines major changes between releases.
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221) - Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220) - Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
- Return `ETag` value in quotes (#219) - Return `ETag` value in quotes (#219)
- Use tombstone when delete multipart upload (#275)
### Removed ### Removed
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227) - Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)

View file

@ -77,6 +77,7 @@ type MultipartInfo struct {
Created time.Time Created time.Time
Meta map[string]string Meta map[string]string
CopiesNumbers []uint32 CopiesNumbers []uint32
Finished bool
} }
// PartInfo is upload information about part. // PartInfo is upload information about part.

View file

@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
n.cache.DeleteObject(addr) n.cache.DeleteObject(addr)
} }
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
} }
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) { func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
} }
} }
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID) return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
} }
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -377,7 +377,7 @@ LOOP:
return result, nil return result, nil
} }
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var uploadID string var uploadID string
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
LOOP: LOOP:
for key, multiparts := range cnrMultipartsMap { for key, multiparts := range cnrMultipartsMap {
for i, multipart := range multiparts { for i, multipart := range multiparts {
if multipart.ID == multipartNodeID { if multipart.ID == multipartInfo.ID {
uploadID = multipart.UploadID uploadID = multipart.UploadID
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...) cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
break LOOP break LOOP

View file

@ -64,7 +64,7 @@ type TreeService interface {
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)

View file

@ -82,6 +82,7 @@ const (
sizeKV = "Size" sizeKV = "Size"
etagKV = "ETag" etagKV = "ETag"
md5KV = "MD5" md5KV = "MD5"
finishedKV = "Finished"
// keys for lock. // keys for lock.
isLockKV = "IsLock" isLockKV = "IsLock"
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
multipartInfo.Created = time.UnixMilli(utcMilli) multipartInfo.Created = time.UnixMilli(utcMilli)
} }
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
}
return multipartInfo, nil return multipartInfo, nil
} }
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
} }
case ownerKV: case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) _ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
multipartInfo.Finished = isFinished
}
default: default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
} }
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
} }
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode) multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil { if err != nil || multipartInfo.Finished {
continue continue
} }
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
continue continue
} }
if info.UploadID == uploadID { if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil return info, nil
} }
} }
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
return result, nil return result, nil
} }
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID) err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
} }
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error { func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
info.Meta[uploadIDKV] = info.UploadID info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
return info.Meta return info.Meta
} }