From 8fb383525081facdeab3d7af24716e4d10fc5ae0 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 2 Mar 2022 19:09:02 +0300 Subject: [PATCH] [#346] api: Do not use `io.Pipe` in `CompleteMultipartUpload` Replace `layer.objectWritePayload` method with `initObjectPayloadReader` which returns `io.Reader` of the object payload. Copy payload data to the parameterized `io.Writer` in `layer.GetObject`. Remove `io.Pipe` from `CompleteMultipartUpload` implementation and build analogue of `io.MultiReader` for the part list. Signed-off-by: Leonard Lyubich --- api/layer/layer.go | 19 +++++-- api/layer/multipart_upload.go | 101 ++++++++++++++++++++-------------- api/layer/object.go | 22 ++------ 3 files changed, 79 insertions(+), 63 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index 127f5bf7f..67c875bbd 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -567,7 +567,6 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { var params getParams - params.w = p.Writer params.oid = p.ObjectInfo.ID params.cid = p.ObjectInfo.CID @@ -580,10 +579,22 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { params.ln = p.Range.End - p.Range.Start + 1 } - err := n.objectWritePayload(ctx, params) + payload, err := n.initObjectPayloadReader(ctx, params) if err != nil { - n.objCache.Delete(p.ObjectInfo.Address()) - return fmt.Errorf("couldn't get object, cid: %s : %w", p.ObjectInfo.CID, err) + return fmt.Errorf("init object payload reader: %w", err) + } + + if params.ln == 0 { + params.ln = 4096 // configure? + } + + // alloc buffer for copying + buf := make([]byte, params.ln) // sync-pool it? + + // copy full payload + _, err = io.CopyBuffer(p.Writer, payload, buf) + if err != nil { + return fmt.Errorf("copy object payload: %w", err) } return nil diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 4bd55acae..8469c70f0 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -2,6 +2,8 @@ package layer import ( "context" + stderrors "errors" + "fmt" "io" "sort" "strconv" @@ -169,6 +171,45 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return n.CopyObject(ctx, c) } +// implements io.Reader of payloads of the object list stored in the NeoFS network. +type multiObjectReader struct { + ctx context.Context + + layer *layer + + prm getParams + + curReader io.Reader + + parts []*data.ObjectInfo +} + +func (x *multiObjectReader) Read(p []byte) (n int, err error) { + if x.curReader != nil { + n, err = x.curReader.Read(p) + if !stderrors.Is(err, io.EOF) { + return n, err + } + } + + if len(x.parts) == 0 { + return n, io.EOF + } + + x.prm.oid = x.parts[0].ID + + x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm) + if err != nil { + return n, fmt.Errorf("init payload reader for the next part: %w", err) + } + + x.parts = x.parts[1:] + + next, err := x.Read(p[n:]) + + return n + next, err +} + func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) { var obj *data.ObjectInfo @@ -231,10 +272,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar initMetadata[api.ContentType] = objects[0].ContentType } - pr, pw := io.Pipe() - done := make(chan bool) - uploadCompleted := false - /* We will keep "S3-Upload-Id" attribute in completed object to determine is it "common" object or completed object. We will need to differ these objects if something goes wrong during completing multipart upload. I.e. we had completed the object but didn't put tagging/acl for some reason */ @@ -242,46 +279,26 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar delete(initMetadata, UploadKeyAttributeName) delete(initMetadata, attrVersionsIgnore) - go func(done chan bool) { - obj, err = n.objectPut(ctx, p.Info.Bkt, &PutObjectParams{ - Bucket: p.Info.Bkt.Name, - Object: p.Info.Key, - Reader: pr, - Header: initMetadata, - }) - if err != nil { - n.log.Error("could not put a completed object (multipart upload)", - zap.String("uploadID", p.Info.UploadID), - zap.String("uploadKey", p.Info.Key), - zap.Error(err)) - done <- true - return - } - uploadCompleted = true - done <- true - }(done) - - var prmGet getParams - prmGet.w = pw - prmGet.cid = p.Info.Bkt.CID - - for _, part := range parts { - prmGet.oid = part.ID - - err = n.objectWritePayload(ctx, prmGet) - if err != nil { - _ = pw.Close() - n.log.Error("could not download a part of multipart upload", - zap.String("uploadID", p.Info.UploadID), - zap.String("part number", part.Headers[UploadPartNumberAttributeName]), - zap.Error(err)) - return nil, err - } + r := &multiObjectReader{ + ctx: ctx, + layer: n, + parts: parts, } - _ = pw.Close() - <-done - if !uploadCompleted { + r.prm.cid = p.Info.Bkt.CID + + obj, err = n.objectPut(ctx, p.Info.Bkt, &PutObjectParams{ + Bucket: p.Info.Bkt.Name, + Object: p.Info.Key, + Reader: r, + Header: initMetadata, + }) + if err != nil { + n.log.Error("could not put a completed object (multipart upload)", + zap.String("uploadID", p.Info.UploadID), + zap.String("uploadKey", p.Info.Key), + zap.Error(err)) + return nil, errors.GetAPIError(errors.ErrInternalError) } diff --git a/api/layer/object.go b/api/layer/object.go index 105b90f30..f9238c6ec 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -27,8 +27,6 @@ type ( } getParams struct { - w io.Writer - // payload range off, ln uint64 @@ -114,9 +112,9 @@ func (n *layer) objectHead(ctx context.Context, idCnr *cid.ID, idObj *oid.ID) (* return res.Head, nil } -// writes payload part of the NeoFS object to the provided io.Writer. +// initializes payload reader of the NeoFS object. // Zero range corresponds to full payload (panics if only offset is set). -func (n *layer) objectWritePayload(ctx context.Context, p getParams) error { +func (n *layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) { prm := PrmObjectRead{ Container: *p.cid, Object: *p.oid, @@ -127,21 +125,11 @@ func (n *layer) objectWritePayload(ctx context.Context, p getParams) error { n.prepareAuthParameters(ctx, &prm.PrmAuth) res, err := n.neoFS.ReadObject(ctx, prm) - if err == nil { - defer res.Payload.Close() - - if p.ln == 0 { - p.ln = 4096 // configure? - } - - // alloc buffer for copying - buf := make([]byte, p.ln) // sync-pool it? - - // copy full payload - _, err = io.CopyBuffer(p.w, res.Payload, buf) + if err != nil { + return nil, n.transformNeofsError(ctx, err) } - return n.transformNeofsError(ctx, err) + return res.Payload, nil } // objectGet returns an object with payload in the object.