From 9b1ccd39bea5e340e6550f09c1bdcb16f95ba56e Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 31 May 2022 15:38:06 +0300 Subject: [PATCH] [#475] Add extra attributes to partInfo Signed-off-by: Denis Kirillov --- api/data/tree.go | 3 ++ api/layer/layer.go | 2 +- api/layer/multipart_upload.go | 91 ++++++++++++++--------------------- api/layer/object.go | 6 +-- internal/neofs/tree.go | 25 +++++++++- 5 files changed, 65 insertions(+), 62 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index b0a7a55..a11095c 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -61,6 +61,9 @@ type PartInfo struct { UploadID string Number int OID oid.ID + Size int64 + ETag string + Created time.Time } // LockInfo is lock information to create appropriate tree node. diff --git a/api/layer/layer.go b/api/layer/layer.go index f56c21c..31f00d9 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -389,7 +389,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { var params getParams - params.objInfo = p.ObjectInfo + params.oid = p.ObjectInfo.ID params.bktInfo = p.BucketInfo if p.Range != nil { diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index a38e02e..eb5eb6e 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -199,6 +199,9 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf UploadID: p.Info.UploadID, Number: p.PartNumber, OID: *id, + Size: p.Size, + ETag: hex.EncodeToString(hash), + Created: time.Now(), } oldPartID, err := n.treeService.AddPart(ctx, &bktInfo.CID, multipartInfo.ID, partInfo) @@ -220,9 +223,9 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Owner: bktInfo.Owner, Bucket: bktInfo.Name, - Size: p.Size, - Created: time.Now(), - HashSum: hex.EncodeToString(hash), + Size: partInfo.Size, + Created: partInfo.Created, + HashSum: partInfo.ETag, } if err = n.objCache.PutObject(objInfo); err != nil { @@ -287,7 +290,7 @@ type multiObjectReader struct { curReader io.Reader - parts []*data.ObjectInfo + parts []*data.PartInfo } func (x *multiObjectReader) Read(p []byte) (n int, err error) { @@ -302,7 +305,7 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) { return n, io.EOF } - x.prm.objInfo = x.parts[0] + x.prm.oid = x.parts[0].OID x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm) if err != nil { @@ -323,27 +326,27 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar } } - multipartInfo, objects, err := n.getUploadParts(ctx, p.Info) // todo consider avoid heading objects + multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info) if err != nil { return nil, nil, err } - if len(objects) < len(p.Parts) { + if len(partsInfo) < len(p.Parts) { return nil, nil, errors.GetAPIError(errors.ErrInvalidPart) } - parts := make([]*data.ObjectInfo, 0, len(p.Parts)) + parts := make([]*data.PartInfo, 0, len(p.Parts)) for i, part := range p.Parts { - info := objects[part.PartNumber] - if info == nil || part.ETag != info.HashSum { + partInfo := partsInfo[part.PartNumber] + if part.ETag != partInfo.ETag { return nil, nil, errors.GetAPIError(errors.ErrInvalidPart) } // for the last part we have no minimum size limit - if i != len(p.Parts)-1 && info.Size < uploadMinSize { + if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, errors.GetAPIError(errors.ErrEntityTooSmall) } - parts = append(parts, info) + parts = append(parts, partInfo) } initMetadata := make(map[string]string, len(multipartInfo.Meta)) @@ -386,17 +389,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var addr oid.Address addr.SetContainer(p.Info.Bkt.CID) - for partNum, objInfo := range objects { - if partNum == 0 { - continue - } - if err = n.objectDelete(ctx, p.Info.Bkt, objInfo.ID); err != nil { + for _, partInfo := range partsInfo { + if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { n.log.Warn("could not delete upload part", - zap.Stringer("object id", objInfo.ID), - zap.Stringer("bucket id", p.Info.Bkt.CID), + zap.Stringer("object id", &partInfo.OID), + zap.Stringer("bucket id", &p.Info.Bkt.CID), zap.Error(err)) } - addr.SetObject(objInfo.ID) + addr.SetObject(partInfo.OID) n.objCache.Delete(addr) } @@ -464,14 +464,15 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload } func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { - multipartInfo, objects, err := n.getUploadParts(ctx, p) + multipartInfo, parts, err := n.getUploadParts(ctx, p) if err != nil { return err } - for _, info := range objects { - if err = n.objectDelete(ctx, p.Bkt, info.ID); err != nil { - return err + for _, info := range parts { + if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { + n.log.Warn("couldn't delete part", zap.String("cid", p.Bkt.CID.EncodeToString()), + zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number)) } } @@ -480,24 +481,21 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { var res ListPartsInfo - multipartInfo, objs, err := n.getUploadParts(ctx, p.Info) // todo consider listing without head object from NeoFS + multipartInfo, partsInfo, err := n.getUploadParts(ctx, p.Info) if err != nil { return nil, err } res.Owner = multipartInfo.Owner - parts := make([]*Part, 0, len(objs)) + parts := make([]*Part, 0, len(partsInfo)) - for num, objInfo := range objs { - if num == 0 { - continue - } + for _, partInfo := range partsInfo { parts = append(parts, &Part{ - ETag: objInfo.HashSum, - LastModified: objInfo.Created.UTC().Format(time.RFC3339), - PartNumber: num, - Size: objInfo.Size, + ETag: partInfo.ETag, + LastModified: partInfo.Created.UTC().Format(time.RFC3339), + PartNumber: partInfo.Number, + Size: partInfo.Size, }) } @@ -525,7 +523,7 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn return &res, nil } -func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) { +func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID) if err != nil { if stderrors.Is(err, ErrNodeNotFound) { @@ -539,28 +537,9 @@ func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data. return nil, nil, err } - res := make(map[int]*data.ObjectInfo) - var addr oid.Address - addr.SetContainer(p.Bkt.CID) + res := make(map[int]*data.PartInfo, len(parts)) for _, part := range parts { - addr.SetObject(part.OID) - objInfo := n.objCache.GetObject(addr) - if objInfo == nil { - meta, err := n.objectHead(ctx, p.Bkt, part.OID) - if err != nil { - n.log.Warn("couldn't head a part of upload", - zap.String("object id", part.OID.EncodeToString()), - zap.String("bucket id", p.Bkt.CID.EncodeToString()), - zap.Error(err)) - continue - } - objInfo = objInfoFromMeta(p.Bkt, meta) - } - - res[part.Number] = objInfo - if err = n.objCache.PutObject(objInfo); err != nil { - n.log.Warn("couldn't cache upload part", zap.Error(err)) - } + res[part.Number] = part } return multipartInfo, res, nil diff --git a/api/layer/object.go b/api/layer/object.go index 61acaae..032f9c1 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -30,7 +30,7 @@ type ( // payload range off, ln uint64 - objInfo *data.ObjectInfo + oid oid.ID bktInfo *data.BucketInfo } @@ -93,8 +93,8 @@ func (n *layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj // Zero range corresponds to full payload (panics if only offset is set). func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) { prm := PrmObjectRead{ - Container: p.objInfo.CID, - Object: p.objInfo.ID, + Container: p.bktInfo.CID, + Object: p.oid, WithPayload: true, PayloadRange: [2]uint64{p.off, p.ln}, } diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 26fdcaf..a3e24cc 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -55,6 +55,8 @@ const ( isTagKV = "isTag" uploadIDKV = "UploadId" partNumberKV = "Number" + sizeKV = "Size" + etagKV = "ETag" // keys for lock. isLockKV = "IsLock" @@ -215,16 +217,32 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { } func newPartInfo(node NodeResponse) (*data.PartInfo, error) { + var err error partInfo := &data.PartInfo{} for _, kv := range node.GetMeta() { + value := string(kv.GetValue()) switch kv.GetKey() { case partNumberKV: - partInfo.Number, _ = strconv.Atoi(string(kv.GetValue())) + if partInfo.Number, err = strconv.Atoi(value); err != nil { + return nil, fmt.Errorf("invalid part number: %w", err) + } case oidKV: - if err := partInfo.OID.DecodeString(string(kv.GetValue())); err != nil { + if err = partInfo.OID.DecodeString(value); err != nil { return nil, fmt.Errorf("invalid oid: %w", err) } + case etagKV: + partInfo.ETag = value + case sizeKV: + if partInfo.Size, err = strconv.ParseInt(value, 10, 64); err != nil { + return nil, fmt.Errorf("invalid part size: %w", err) + } + case createdKV: + var utcMilli int64 + if utcMilli, err = strconv.ParseInt(value, 10, 64); err != nil { + return nil, fmt.Errorf("invalid created timestamp: %w", err) + } + partInfo.Created = time.UnixMilli(utcMilli) } } @@ -802,6 +820,9 @@ func (c *TreeClient) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), + sizeKV: strconv.FormatInt(info.Size, 10), + createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), + etagKV: info.ETag, } var foundPartID uint64