diff --git a/client/object_get.go b/client/object_get.go index 5fc6803..1bc0705 100644 --- a/client/object_get.go +++ b/client/object_get.go @@ -95,6 +95,8 @@ func (x *prmObjectRead) ByID(id oid.ID) { // PrmObjectGet groups parameters of ObjectGetInit operation. type PrmObjectGet struct { prmObjectRead + + key *ecdsa.PrivateKey } // ResObjectGet groups the final result values of ObjectGetInit operation. @@ -109,10 +111,13 @@ type ResObjectGet struct { type ObjectReader struct { cancelCtxStream context.CancelFunc - ctxCall contextCall + client *Client + stream interface { + Read(resp *v2object.GetResponse) error + } - // initially bound to contextCall - bodyResp v2object.GetResponseBody + res ResObjectGet + err error tailPayload []byte @@ -121,29 +126,32 @@ type ObjectReader struct { // UseKey specifies private key to sign the requests. // If key is not provided, then Client default key is used. -func (x *ObjectReader) UseKey(key ecdsa.PrivateKey) { - x.ctxCall.key = key -} - -func handleSplitInfo(ctx *contextCall, i *v2object.SplitInfo) { - ctx.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(i)) +func (x *PrmObjectGet) UseKey(key ecdsa.PrivateKey) { + x.key = &key } // ReadHeader reads header of the object. Result means success. // Failure reason can be received via Close. func (x *ObjectReader) ReadHeader(dst *object.Object) bool { - if !x.ctxCall.writeRequest() || !x.ctxCall.readResponse() { + var resp v2object.GetResponse + x.err = x.stream.Read(&resp) + if x.err != nil { + return false + } + + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { return false } var partInit *v2object.GetObjectPartInit - switch v := x.bodyResp.GetObjectPart().(type) { + switch v := resp.GetBody().GetObjectPart().(type) { default: - x.ctxCall.err = fmt.Errorf("unexpected message instead of heading part: %T", v) + x.err = fmt.Errorf("unexpected message instead of heading part: %T", v) return false case *v2object.SplitInfo: - handleSplitInfo(&x.ctxCall, v) + x.err = object.NewSplitInfoError(object.NewSplitInfoFromV2(v)) return false case *v2object.GetObjectPartInit: partInit = v @@ -174,26 +182,25 @@ func (x *ObjectReader) readChunk(buf []byte) (int, bool) { return read, true } - var ok bool - var part v2object.GetObjectPart var chunk []byte var lastRead int for { - // receive next message - ok = x.ctxCall.readResponse() - if !ok { + var resp v2object.GetResponse + x.err = x.stream.Read(&resp) + if x.err != nil { return read, false } - // get chunk part message - part = x.bodyResp.GetObjectPart() + x.res.st, x.err = x.client.processResponse(&resp) + if x.err != nil || !apistatus.IsSuccessful(x.res.st) { + return read, false + } - var partChunk *v2object.GetObjectPartChunk - - partChunk, ok = part.(*v2object.GetObjectPartChunk) + part := resp.GetBody().GetObjectPart() + partChunk, ok := part.(*v2object.GetObjectPartChunk) if !ok { - x.ctxCall.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) + x.err = fmt.Errorf("unexpected message instead of chunk part: %T", part) return read, false } @@ -228,9 +235,9 @@ func (x *ObjectReader) ReadChunk(buf []byte) (int, bool) { func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) { defer x.cancelCtxStream() - if x.ctxCall.err != nil { - if !errors.Is(x.ctxCall.err, io.EOF) { - return nil, x.ctxCall.err + if x.err != nil { + if !errors.Is(x.err, io.EOF) { + return nil, x.err } else if !ignoreEOF { if x.remainingPayloadLen > 0 { return nil, io.ErrUnexpectedEOF @@ -240,7 +247,7 @@ func (x *ObjectReader) close(ignoreEOF bool) (*ResObjectGet, error) { } } - return x.ctxCall.statusRes.(*ResObjectGet), nil + return &x.res, nil } // Close ends reading the object and returns the result of the operation @@ -316,38 +323,26 @@ func (c *Client) ObjectGetInit(ctx context.Context, prm PrmObjectGet) (*ObjectRe var req v2object.GetRequest req.SetBody(&body) - req.SetMetaHeader(&prm.meta) + c.prepareRequest(&req, &prm.meta) - // init reader - var ( - r ObjectReader - resp v2object.GetResponse - stream *rpcapi.GetResponseReader - ) - - ctx, r.cancelCtxStream = context.WithCancel(ctx) - - resp.SetBody(&r.bodyResp) - - // init call context - c.initCallContext(&r.ctxCall) - r.ctxCall.req = &req - r.ctxCall.statusRes = new(ResObjectGet) - r.ctxCall.resp = &resp - r.ctxCall.wReq = func() error { - var err error - - stream, err = rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) - if err != nil { - return fmt.Errorf("open stream: %w", err) - } - - return nil + err := signature.SignServiceMessage(&c.prm.key, &req) + if err != nil { + return nil, fmt.Errorf("sign request: %w", err) } - r.ctxCall.rResp = func() error { - return stream.Read(&resp) + + ctx, cancel := context.WithCancel(ctx) + + stream, err := rpcapi.GetObject(&c.c, &req, client.WithContext(ctx)) + if err != nil { + cancel() + return nil, fmt.Errorf("open stream: %w", err) } + var r ObjectReader + r.cancelCtxStream = cancel + r.stream = stream + r.client = c + return &r, nil } diff --git a/pool/pool.go b/pool/pool.go index 13c92d5..6cfaaef 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -597,6 +597,10 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet cliPrm.WithBearerToken(*prm.btoken) } + if prm.key != nil { + cliPrm.UseKey(*prm.key) + } + var res ResGetObject rObj, err := c.client.ObjectGetInit(ctx, cliPrm) @@ -604,10 +608,6 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err) } - if prm.key != nil { - rObj.UseKey(*prm.key) - } - start := time.Now() successReadHeader := rObj.ReadHeader(&res.Header) c.incRequests(time.Since(start), methodObjectGet)