[#275] Change logic abort multipart upload #275

Merged
alexvanin merged 1 commit from :feature/change_logic_abort_multipart into master 2023-12-27 11:08:59 +00:00
6 changed files with 33 additions and 8 deletions

View file

@ -27,6 +27,7 @@ This document outlines major changes between releases.
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221) - Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
- Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220) - Set server IdleTimeout and ReadHeaderTimeout to `30s` and allow to configure them (#220)
- Return `ETag` value in quotes (#219) - Return `ETag` value in quotes (#219)
- Use tombstone when delete multipart upload (#275)
### Removed ### Removed
- Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227) - Drop sending whitespace characters during complete multipart upload and related config param `kludge.complete_multipart_keepalive` (#227)

View file

@ -77,6 +77,7 @@ type MultipartInfo struct {
Created time.Time Created time.Time
Meta map[string]string Meta map[string]string
CopiesNumbers []uint32 CopiesNumbers []uint32
Finished bool
} }
// PartInfo is upload information about part. // PartInfo is upload information about part.

View file

@ -489,7 +489,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
n.cache.DeleteObject(addr) n.cache.DeleteObject(addr)
} }
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
} }
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) { func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
@ -565,7 +565,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
} }
} }
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo.ID) return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
} }
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -377,7 +377,7 @@ LOOP:
return result, nil return result, nil
} }
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var uploadID string var uploadID string
@ -385,7 +385,7 @@ func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data
LOOP: LOOP:
for key, multiparts := range cnrMultipartsMap { for key, multiparts := range cnrMultipartsMap {
for i, multipart := range multiparts { for i, multipart := range multiparts {
if multipart.ID == multipartNodeID { if multipart.ID == multipartInfo.ID {
uploadID = multipart.UploadID uploadID = multipart.UploadID
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...) cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
break LOOP break LOOP

View file

@ -64,7 +64,7 @@ type TreeService interface {
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error) GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)

View file

@ -82,6 +82,7 @@ const (
sizeKV = "Size" sizeKV = "Size"
etagKV = "ETag" etagKV = "ETag"
md5KV = "MD5" md5KV = "MD5"
finishedKV = "Finished"
// keys for lock. // keys for lock.
isLockKV = "IsLock" isLockKV = "IsLock"
@ -245,6 +246,11 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
multipartInfo.Created = time.UnixMilli(utcMilli) multipartInfo.Created = time.UnixMilli(utcMilli)
} }
finished, _ := treeNode.Get(finishedKV)
if flag, err := strconv.ParseBool(finished); err == nil {
multipartInfo.Finished = flag
}
alexvanin marked this conversation as resolved Outdated

What happens when this code is running during multipart upload started by previous version of the gateway?

Previous version won't set finishedKV flag, so ParseBool returns error and multipartInfo.Finished stays false. Shouldn't it be true in such cases?

What happens when this code is running during multipart upload started by previous version of the gateway? Previous version won't set `finishedKV` flag, so `ParseBool` returns error and `multipartInfo.Finished` stays `false`. Shouldn't it be `true` in such cases?

As we found out, this code isn't triggered when accessing multipart upload finished by previous version of the gateway, so it works as expected.

As we found out, this code isn't triggered when accessing multipart upload finished by previous version of the gateway, so it works as expected.
return multipartInfo, nil return multipartInfo, nil
} }
@ -266,6 +272,10 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
} }
case ownerKV: case ownerKV:
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) _ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
case finishedKV:
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
multipartInfo.Finished = isFinished
}
default: default:
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
} }
@ -949,7 +959,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
} }
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode) multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil { if err != nil || multipartInfo.Finished {
continue continue
} }
@ -992,6 +1002,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
continue continue
} }
if info.UploadID == uploadID { if info.UploadID == uploadID {
if info.Finished {
break
}
return info, nil return info, nil
} }
} }
@ -1055,8 +1068,15 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
return result, nil return result, nil
} }
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
return c.service.RemoveNode(ctx, bktInfo, systemTree, multipartNodeID) err := c.service.RemoveNode(ctx, bktInfo, systemTree, multipartInfo.ID)
if err != nil {
return err
}
multipartInfo.Finished = true
return c.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
} }
func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error { func (c *Tree) PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error {
@ -1241,6 +1261,9 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
info.Meta[uploadIDKV] = info.UploadID info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
if info.Finished {
info.Meta[finishedKV] = strconv.FormatBool(info.Finished)
}
return info.Meta return info.Meta
} }