From 1f593d0fb24d57016c78edff9dd79b6edcafd844 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 25 Aug 2022 14:32:37 +0300 Subject: [PATCH] [#323] client: Refactor `object.GetRange` Signed-off-by: Evgenii Stratonikov --- client/object_get.go | 100 ++++++++++++++++++------------------------- pool/pool.go | 7 +-- 2 files changed, 45 insertions(+), 62 deletions(-) diff --git a/client/object_get.go b/client/object_get.go index b832cf2a..5fc68035 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -480,6 +480,8 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH type PrmObjectRange struct { prmObjectRead + key *ecdsa.PrivateKey + rng v2object.Range } @@ -495,6 +497,12 @@ func (x *PrmObjectRange) SetLength(ln uint64) { x.rng.SetLength(ln) } +// UseKey specifies private key to sign the requests. +// If key is not provided, then Client default key is used. +func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) { + x.key = &key +} + // ResObjectRange groups the final result values of ObjectRange operation. type ResObjectRange struct { statusRes @@ -508,33 +516,21 @@ type ResObjectRange struct { type ObjectRangeReader struct { cancelCtxStream context.CancelFunc - ctxCall contextCall + client *Client - reqWritten bool + res ResObjectRange + err error - // initially bound to contextCall - bodyResp v2object.GetRangeResponseBody + stream interface { + Read(resp *v2object.GetRangeResponse) error + } tailPayload []byte remainingPayloadLen int } -// UseKey specifies private key to sign the requests. -// If key is not provided, then Client default key is used. -func (x *ObjectRangeReader) UseKey(key ecdsa.PrivateKey) { - x.ctxCall.key = key -} - func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { - if !x.reqWritten { - if !x.ctxCall.writeRequest() { - return 0, false - } - - x.reqWritten = true - } - var read int // read remaining tail @@ -546,25 +542,29 @@ func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) { return read, true } - var ok bool var partChunk *v2object.GetRangePartChunk var chunk []byte var lastRead int for { - // receive next message - ok = x.ctxCall.readResponse() - if !ok { + var resp v2object.GetRangeResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return read, false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return read, false } // get chunk message - switch v := x.bodyResp.GetRangePart().(type) { + switch v := resp.GetBody().GetRangePart().(type) { default: - x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v) + x.err = fmt.Errorf("unexpected message received: %T", v) return read, false case *v2object.SplitInfo: - handleSplitInfo(&x.ctxCall, v) + x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) return read, false case *v2object.GetRangePartChunk: partChunk = v @@ -600,9 +600,9 @@ func (x *ObjectRangeReader) ReadChunk(buf []byte) (int, bool) { func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) { defer x.cancelCtxStream() - if x.ctxCall.err != nil { - if !errors.Is(x.ctxCall.err, io.EOF) { - return nil, x.ctxCall.err + if x.err != nil { + if !errors.Is(x.err, io.EOF) { + return nil, x.err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { return nil, io.ErrUnexpectedEOF @@ -612,7 +612,7 @@ func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) { } } - return x.ctxCall.statusRes.(*ResObjectRange), nil + return &x.res, nil } // Close ends reading the payload range and returns the result of the operation @@ -693,39 +693,21 @@ func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*Obje var req v2object.GetRangeRequest req.SetBody(&body) - req.SetMetaHeader(&prm.meta) + c.prepareRequest(&req, &prm.meta) - // init reader - var ( - r ObjectRangeReader - resp v2object.GetRangeResponse - stream *rpcapi.ObjectRangeResponseReader - ) + ctx, cancel := context.WithCancel(ctx) + stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) + if err != nil { + cancel() + return nil, fmt.Errorf("open stream: %w", err) + } + + var r ObjectRangeReader r.remainingPayloadLen = int(prm.rng.GetLength()) - - ctx, r.cancelCtxStream = context.WithCancel(ctx) - - resp.SetBody(&r.bodyResp) - - // init call context - c.initCallContext(&r.ctxCall) - r.ctxCall.req = &req - r.ctxCall.statusRes = new(ResObjectRange) - r.ctxCall.resp = &resp - r.ctxCall.wReq = func() error { - var err error - - stream, err = rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx)) - if err != nil { - return fmt.Errorf("open stream: %w", err) - } - - return nil - } - r.ctxCall.rResp = func() error { - return stream.Read(&resp) - } + r.cancelCtxStream = cancel + r.stream = stream + r.client = c return &r, nil } diff --git a/pool/pool.go b/pool/pool.go index cde5fbf6..13c92d5d 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -684,15 +684,16 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re cliPrm.WithBearerToken(*prm.btoken) } + if prm.key != nil { + cliPrm.UseKey(*prm.key) + } + start := time.Now() res, err := c.client.ObjectRangeInit(ctx, cliPrm) c.incRequests(time.Since(start), methodObjectRange) if err = c.handleError(nil, err); err != nil { return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err) } - if prm.key != nil { - res.UseKey(*prm.key) - } return ResObjectRange{ payload: res,