[#323] client: Refactor object.GetRange

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-08-25 14:32:37 +03:00 committed by fyrchik
parent f543ba68d3
commit 1f593d0fb2
2 changed files with 45 additions and 62 deletions

View file

@ -480,6 +480,8 @@ func (c *Client) ObjectHead(ctx context.Context, prm PrmObjectHead) (*ResObjectH
type PrmObjectRange struct {
prmObjectRead
key *ecdsa.PrivateKey
rng v2object.Range
}
@ -495,6 +497,12 @@ func (x *PrmObjectRange) SetLength(ln uint64) {
x.rng.SetLength(ln)
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *PrmObjectRange) UseKey(key ecdsa.PrivateKey) {
x.key = &key
}
// ResObjectRange groups the final result values of ObjectRange operation.
type ResObjectRange struct {
statusRes
@ -508,33 +516,21 @@ type ResObjectRange struct {
type ObjectRangeReader struct {
cancelCtxStream context.CancelFunc
ctxCall contextCall
client *Client
reqWritten bool
res ResObjectRange
err error
// initially bound to contextCall
bodyResp v2object.GetRangeResponseBody
stream interface {
Read(resp *v2object.GetRangeResponse) error
}
tailPayload []byte
remainingPayloadLen int
}
// UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used.
func (x *ObjectRangeReader) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key
}
func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) {
if !x.reqWritten {
if !x.ctxCall.writeRequest() {
return 0, false
}
x.reqWritten = true
}
var read int
// read remaining tail
@ -546,25 +542,29 @@ func (x *ObjectRangeReader) readChunk(buf []byte) (int, bool) {
return read, true
}
var ok bool
var partChunk *v2object.GetRangePartChunk
var chunk []byte
var lastRead int
for {
// receive next message
ok = x.ctxCall.readResponse()
if !ok {
var resp v2object.GetRangeResponse
x.err = x.stream.Read(&resp)
if x.err != nil {
return read, false
}
x.res.st, x.err = x.client.processResponse(&resp)
if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
return read, false
}
// get chunk message
switch v := x.bodyResp.GetRangePart().(type) {
switch v := resp.GetBody().GetRangePart().(type) {
default:
x.ctxCall.err = fmt.Errorf("unexpected message received: %T", v)
x.err = fmt.Errorf("unexpected message received: %T", v)
return read, false
case *v2object.SplitInfo:
handleSplitInfo(&x.ctxCall, v)
x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v))
return read, false
case *v2object.GetRangePartChunk:
partChunk = v
@ -600,9 +600,9 @@ func (x *ObjectRangeReader) ReadChunk(buf []byte) (int, bool) {
func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) {
defer x.cancelCtxStream()
if x.ctxCall.err != nil {
if !errors.Is(x.ctxCall.err, io.EOF) {
return nil, x.ctxCall.err
if x.err != nil {
if !errors.Is(x.err, io.EOF) {
return nil, x.err
} else if !ignoreEOF {
if x.remainingPayloadLen > 0 {
return nil, io.ErrUnexpectedEOF
@ -612,7 +612,7 @@ func (x *ObjectRangeReader) close(ignoreEOF bool) (*ResObjectRange, error) {
}
}
return x.ctxCall.statusRes.(*ResObjectRange), nil
return &x.res, nil
}
// Close ends reading the payload range and returns the result of the operation
@ -693,39 +693,21 @@ func (c *Client) ObjectRangeInit(ctx context.Context, prm PrmObjectRange) (*Obje
var req v2object.GetRangeRequest
req.SetBody(&body)
req.SetMetaHeader(&prm.meta)
c.prepareRequest(&req, &prm.meta)
// init reader
var (
r ObjectRangeReader
resp v2object.GetRangeResponse
stream *rpcapi.ObjectRangeResponseReader
)
ctx, cancel := context.WithCancel(ctx)
r.remainingPayloadLen = int(prm.rng.GetLength())
ctx, r.cancelCtxStream = context.WithCancel(ctx)
resp.SetBody(&r.bodyResp)
// init call context
c.initCallContext(&r.ctxCall)
r.ctxCall.req = &req
r.ctxCall.statusRes = new(ResObjectRange)
r.ctxCall.resp = &resp
r.ctxCall.wReq = func() error {
var err error
stream, err = rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
stream, err := rpcapi.GetObjectRange(&c.c, &req, client.WithContext(ctx))
if err != nil {
return fmt.Errorf("open stream: %w", err)
cancel()
return nil, fmt.Errorf("open stream: %w", err)
}
return nil
}
r.ctxCall.rResp = func() error {
return stream.Read(&resp)
}
var r ObjectRangeReader
r.remainingPayloadLen = int(prm.rng.GetLength())
r.cancelCtxStream = cancel
r.stream = stream
r.client = c
return &r, nil
}

View file

@ -684,15 +684,16 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
cliPrm.WithBearerToken(*prm.btoken)
}
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
start := time.Now()
res, err := c.client.ObjectRangeInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectRange)
if err = c.handleError(nil, err); err != nil {
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
}
if prm.key != nil {
res.UseKey(*prm.key)
}
return ResObjectRange{
payload: res,