[#623] Fix using copy numbers during multipart
All checks were successful
/ DCO (pull_request) Successful in 34s
/ Vulncheck (pull_request) Successful in 1m2s
/ Builds (pull_request) Successful in 1m12s
/ OCI image (pull_request) Successful in 2m1s
/ Lint (pull_request) Successful in 2m24s
/ Tests (pull_request) Successful in 1m22s
/ Vulncheck (push) Successful in 1m3s
/ Builds (push) Successful in 1m17s
/ Lint (push) Successful in 2m17s
/ Tests (push) Successful in 1m22s
/ OCI image (push) Successful in 2m36s

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2025-02-06 14:32:44 +03:00
parent a53e50b324
commit da9703ab63
5 changed files with 72 additions and 20 deletions

View file

@ -109,7 +109,6 @@ type MultipartInfo struct {
Owner user.ID Owner user.ID
Created time.Time Created time.Time
Meta map[string]string Meta map[string]string
CopiesNumbers []uint32
Finished bool Finished bool
CreationEpoch uint64 CreationEpoch uint64
} }

View file

@ -152,12 +152,6 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
p.Header[api.ContentLanguage] = contentLanguage p.Header[api.ContentLanguage] = contentLanguage
} }
p.CopiesNumbers, err = h.pickCopiesNumbers(p.Header, reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(ctx, w, "invalid copies number", reqInfo, err, additional...)
return
}
if err = h.obj.CreateMultipartUpload(ctx, p); err != nil { if err = h.obj.CreateMultipartUpload(ctx, p); err != nil {
h.logAndSendError(ctx, w, "could create multipart upload", reqInfo, err, additional...) h.logAndSendError(ctx, w, "could create multipart upload", reqInfo, err, additional...)
return return
@ -229,6 +223,12 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(ctx, w, "invalid copies number", reqInfo, err, additional...)
return
}
hash, err := h.obj.UploadPart(ctx, p) hash, err := h.obj.UploadPart(ctx, p)
if err != nil { if err != nil {
h.logAndSendError(ctx, w, "could not upload a part", reqInfo, err, additional...) h.logAndSendError(ctx, w, "could not upload a part", reqInfo, err, additional...)
@ -354,6 +354,12 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
return return
} }
p.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(ctx, w, "invalid copies number", reqInfo, err, additional...)
return
}
info, err := h.obj.UploadPartCopy(ctx, p) info, err := h.obj.UploadPartCopy(ctx, p)
if err != nil { if err != nil {
h.logAndSendError(ctx, w, "could not upload part copy", reqInfo, err, additional...) h.logAndSendError(ctx, w, "could not upload part copy", reqInfo, err, additional...)
@ -416,6 +422,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
Parts: reqBody.Parts, Parts: reqBody.Parts,
} }
c.CopiesNumbers, err = h.pickCopiesNumbers(parseMetadata(r), reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(ctx, w, "invalid copies number", reqInfo, err, additional...)
return
}
// Start complete multipart upload which may take some time to fetch object // Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part. // and re-upload it part by part.
objInfo, err := h.completeMultipartUpload(r, c, bktInfo) objInfo, err := h.completeMultipartUpload(r, c, bktInfo)

View file

@ -81,6 +81,26 @@ func TestDeleteMultipartAllParts(t *testing.T) {
require.Empty(t, hc.tp.Objects()) require.Empty(t, hc.tp.Objects())
} }
func TestMultipartCopiesNumber(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "object"
createTestBucket(hc, bktName)
copies := []uint32{2, 0}
hc.config.copiesNumbers = map[string][]uint32{"default": copies}
multipartInfo := createMultipartUpload(hc, bktName, objName, nil)
uploadPart(hc, bktName, objName, multipartInfo.UploadID, 1, layer.UploadMinSize)
objs := hc.tp.Objects()
require.Len(t, objs, 1)
require.EqualValues(t, copies, hc.tp.CopiesNumbers(addrFromObject(objs[0]).EncodeToString()))
}
func TestSpecialMultipartName(t *testing.T) { func TestSpecialMultipartName(t *testing.T) {
hc := prepareHandlerContextWithMinCache(t) hc := prepareHandlerContextWithMinCache(t)
@ -792,3 +812,14 @@ func listPartsBase(hc *handlerContext, bktName, objName string, encrypted bool,
return listPartsResponse return listPartsResponse
} }
func addrFromObject(obj *object.Object) oid.Address {
var addr oid.Address
cnrID, _ := obj.ContainerID()
objID, _ := obj.ID()
addr.SetContainer(cnrID)
addr.SetObject(objID)
return addr
}

View file

@ -76,6 +76,7 @@ var _ frostfs.FrostFS = (*TestFrostFS)(nil)
type TestFrostFS struct { type TestFrostFS struct {
objects map[string]*object.Object objects map[string]*object.Object
copiesNumbers map[string][]uint32
objectErrors map[string]error objectErrors map[string]error
objectPutErrors map[string]error objectPutErrors map[string]error
containers map[string]*container.Container containers map[string]*container.Container
@ -88,6 +89,7 @@ type TestFrostFS struct {
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS { func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
return &TestFrostFS{ return &TestFrostFS{
objects: make(map[string]*object.Object), objects: make(map[string]*object.Object),
copiesNumbers: make(map[string][]uint32),
objectErrors: make(map[string]error), objectErrors: make(map[string]error),
objectPutErrors: make(map[string]error), objectPutErrors: make(map[string]error),
containers: make(map[string]*container.Container), containers: make(map[string]*container.Container),
@ -126,6 +128,10 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res return res
} }
func (t *TestFrostFS) CopiesNumbers(addr string) []uint32 {
return t.copiesNumbers[addr]
}
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool { func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects { for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) { if id, _ := obj.ID(); id.Equals(objID) {
@ -346,6 +352,8 @@ func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCre
addr := newAddress(cnrID, objID) addr := newAddress(cnrID, objID)
t.objects[addr.EncodeToString()] = obj t.objects[addr.EncodeToString()] = obj
t.copiesNumbers[addr.EncodeToString()] = prm.CopiesNumber
return &frostfs.CreateObjectResult{ return &frostfs.CreateObjectResult{
ObjectID: objID, ObjectID: objID,
CreationEpoch: t.currentEpoch - 1, CreationEpoch: t.currentEpoch - 1,

View file

@ -61,7 +61,6 @@ type (
Info *UploadInfoParams Info *UploadInfoParams
Header map[string]string Header map[string]string
Data *UploadData Data *UploadData
CopiesNumbers []uint32
} }
UploadData struct { UploadData struct {
@ -75,6 +74,7 @@ type (
Reader io.Reader Reader io.Reader
ContentMD5 string ContentMD5 string
ContentSHA256Hash string ContentSHA256Hash string
CopiesNumbers []uint32
} }
UploadCopyParams struct { UploadCopyParams struct {
@ -85,11 +85,13 @@ type (
SrcEncryption encryption.Params SrcEncryption encryption.Params
PartNumber int PartNumber int
Range *RangeParams Range *RangeParams
CopiesNumbers []uint32
} }
CompleteMultipartParams struct { CompleteMultipartParams struct {
Info *UploadInfoParams Info *UploadInfoParams
Parts []*CompletedPart Parts []*CompletedPart
CopiesNumbers []uint32
} }
CompletedPart struct { CompletedPart struct {
@ -165,7 +167,6 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
Owner: n.gateOwner, Owner: n.gateOwner,
Created: TimeNow(ctx), Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize), Meta: make(map[string]string, metaSize),
CopiesNumbers: p.CopiesNumbers,
CreationEpoch: networkInfo.CurrentEpoch(), CreationEpoch: networkInfo.CurrentEpoch(),
} }
@ -222,7 +223,7 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Attributes: make([][2]string, 2), Attributes: make([][2]string, 2),
Payload: p.Reader, Payload: p.Reader,
CreationTime: TimeNow(ctx), CreationTime: TimeNow(ctx),
CopiesNumber: multipartInfo.CopiesNumbers, CopiesNumber: p.CopiesNumbers,
} }
decSize := p.Size decSize := p.Size
@ -376,6 +377,7 @@ func (n *Layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
PartNumber: p.PartNumber, PartNumber: p.PartNumber,
Size: size, Size: size,
Reader: objPayload, Reader: objPayload,
CopiesNumbers: p.CopiesNumbers,
} }
return n.uploadPart(ctx, multipartInfo, params) return n.uploadPart(ctx, multipartInfo, params)
@ -472,7 +474,7 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
Header: initMetadata, Header: initMetadata,
Size: &multipartObjetSize, Size: &multipartObjetSize,
Encryption: p.Info.Encryption, Encryption: p.Info.Encryption,
CopiesNumbers: multipartInfo.CopiesNumbers, CopiesNumbers: p.CopiesNumbers,
CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)), CompleteMD5Hash: hex.EncodeToString(md5Hash.Sum(nil)) + "-" + strconv.Itoa(len(p.Parts)),
}) })
if err != nil { if err != nil {