[#323] client: Refactor object.Get

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-08-25 14:45:50 +03:00 committed by fyrchik
parent 1f593d0fb2
commit 724d30db1a
2 changed files with 55 additions and 60 deletions

View file

@ -95,6 +95,8 @@ func (x *prmObjectRead) ByID(id oid.ID) {
// PrmObjectGet groups parameters of ObjectGetInit operation. // PrmObjectGet groups parameters of ObjectGetInit operation.
type PrmObjectGet struct { type PrmObjectGet struct {
prmObjectRead prmObjectRead
key *ecdsa.PrivateKey
} }
// ResObjectGet groups the final result values of ObjectGetInit operation. // ResObjectGet groups the final result values of ObjectGetInit operation.
@ -109,10 +111,13 @@ type ResObjectGet struct {
type ObjectReader struct { type ObjectReader struct {
cancelCtxStream context.CancelFunc cancelCtxStream context.CancelFunc
ctxCall contextCall client *Client
stream interface {
Read(resp *v2object.GetResponse) error
}
// initially bound to contextCall res ResObjectGet
bodyResp v2object.GetResponseBody err error
tailPayload []byte tailPayload []byte
@ -121,29 +126,32 @@ type ObjectReader struct {
// UseKey specifies private key to sign the requests. // UseKey specifies private key to sign the requests.
// If key is not provided, then Client default key is used. // If key is not provided, then Client default key is used.
func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) { func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) {
x.ctxCall.key = key x.key = &key
}
func handleSplitInfo(ctx *contextCall, i *v2object.SplitInfo) {
ctx.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(i))
} }
// ReadHeader reads header of the object. Result means success. // ReadHeader reads header of the object. Result means success.
// Failure reason can be received via Close. // Failure reason can be received via Close.
func (x *ObjectReader) ReadHeader(dst *object.Object) bool { func (x *ObjectReader) ReadHeader(dst *object.Object) bool {
if !x.ctxCall.writeRequest() || !x.ctxCall.readResponse() { var resp v2object.GetResponse
x.err = x.stream.Read(&resp)
if x.err != nil {
return false
}
x.res.st, x.err = x.client.processResponse(&resp)
if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
return false return false
} }
var partInit *v2object.GetObjectPartInit var partInit *v2object.GetObjectPartInit
switch v := x.bodyResp.GetObjectPart().(type) { switch v := resp.GetBody().GetObjectPart().(type) {
default: default:
x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v) x.err = fmt.Errorf("unexpected message instead of heading part: %T", v)
return false return false
case *v2object.SplitInfo: case *v2object.SplitInfo:
handleSplitInfo(&x.ctxCall, v) x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v))
return false return false
case *v2object.GetObjectPartInit: case *v2object.GetObjectPartInit:
partInit = v partInit = v
@ -174,26 +182,25 @@ func (x *ObjectReader) readChunk(buf []byte) (int, bool) {
return read, true return read, true
} }
var ok bool
var part v2object.GetObjectPart
var chunk []byte var chunk []byte
var lastRead int var lastRead int
for { for {
// receive next message var resp v2object.GetResponse
ok = x.ctxCall.readResponse() x.err = x.stream.Read(&resp)
if !ok { if x.err != nil {
return read, false return read, false
} }
// get chunk part message x.res.st, x.err = x.client.processResponse(&resp)
part = x.bodyResp.GetObjectPart() if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
return read, false
}
var partChunk *v2object.GetObjectPartChunk part := resp.GetBody().GetObjectPart()
partChunk, ok := part.(*v2object.GetObjectPartChunk)
partChunk, ok = part.(*v2object.GetObjectPartChunk)
if !ok { if !ok {
x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) x.err = fmt.Errorf("unexpected message instead of chunk part: %T", part)
return read, false return read, false
} }
@ -228,9 +235,9 @@ func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) {
func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) { func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) {
defer x.cancelCtxStream() defer x.cancelCtxStream()
if x.ctxCall.err != nil { if x.err != nil {
if !errors.Is(x.ctxCall.err, io.EOF) { if !errors.Is(x.err, io.EOF) {
return nil, x.ctxCall.err return nil, x.err
} else if !ignoreEOF { } else if !ignoreEOF {
if x.remainingPayloadLen > 0 { if x.remainingPayloadLen > 0 {
return nil, io.ErrUnexpectedEOF return nil, io.ErrUnexpectedEOF
@ -240,7 +247,7 @@ func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) {
} }
} }
return x.ctxCall.statusRes.(*ResObjectGet), nil return &x.res, nil
} }
// Close ends reading the object and returns the result of the operation // Close ends reading the object and returns the result of the operation
@ -316,38 +323,26 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe
var req v2object.GetRequest var req v2object.GetRequest
req.SetBody(&body) req.SetBody(&body)
req.SetMetaHeader(&prm.meta) c.prepareRequest(&req, &prm.meta)
// init reader err := signature.SignServiceMessage(&c.prm.key, &req)
var ( if err != nil {
r ObjectReader return nil, fmt.Errorf("sign request: %w", err)
resp v2object.GetResponse
stream *rpcapi.GetResponseReader
)
ctx, r.cancelCtxStream = context.WithCancel(ctx)
resp.SetBody(&r.bodyResp)
// init call context
c.initCallContext(&r.ctxCall)
r.ctxCall.req = &req
r.ctxCall.statusRes = new(ResObjectGet)
r.ctxCall.resp = &resp
r.ctxCall.wReq = func() error {
var err error
stream, err = rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
if err != nil {
return fmt.Errorf("open stream: %w", err)
}
return nil
} }
r.ctxCall.rResp = func() error {
return stream.Read(&resp) ctx, cancel := context.WithCancel(ctx)
stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx))
if err != nil {
cancel()
return nil, fmt.Errorf("open stream: %w", err)
} }
var r ObjectReader
r.cancelCtxStream = cancel
r.stream = stream
r.client = c
return &r, nil return &r, nil
} }

View file

@ -597,6 +597,10 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
cliPrm.WithBearerToken(*prm.btoken) cliPrm.WithBearerToken(*prm.btoken)
} }
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
var res ResGetObject var res ResGetObject
rObj, err := c.client.ObjectGetInit(ctx, cliPrm) rObj, err := c.client.ObjectGetInit(ctx, cliPrm)
@ -604,10 +608,6 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
} }
if prm.key != nil {
rObj.UseKey(*prm.key)
}
start := time.Now() start := time.Now()
successReadHeader := rObj.ReadHeader(&res.Header) successReadHeader := rObj.ReadHeader(&res.Header)
c.incRequests(time.Since(start), methodObjectGet) c.incRequests(time.Since(start), methodObjectGet)