forked from TrueCloudLab/frostfs-api-go
c814cc62fa
There is a issue when user sends payload chunk to the neofs node, but node closes connection earlier, e.g. node can return error as soon as it checked ACL permission and denied request. In this case client will receive EOF error and it produces `could not send payload bytes to Put object stream` error, but in fact there is different error. If we ignore EOF there then `stream.CloseAndRecv()` return correct error message later. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
1036 lines
24 KiB
Go
1036 lines
24 KiB
Go
package client
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
signer "github.com/nspcc-dev/neofs-api-go/util/signature"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/client"
|
|
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
|
|
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type PutObjectParams struct {
|
|
obj *object.Object
|
|
|
|
r io.Reader
|
|
}
|
|
|
|
type DeleteObjectParams struct {
|
|
addr *object.Address
|
|
}
|
|
|
|
type GetObjectParams struct {
|
|
addr *object.Address
|
|
|
|
w io.Writer
|
|
}
|
|
|
|
type ObjectHeaderParams struct {
|
|
addr *object.Address
|
|
|
|
short bool
|
|
}
|
|
|
|
type RangeDataParams struct {
|
|
addr *object.Address
|
|
|
|
r *object.Range
|
|
|
|
w io.Writer
|
|
}
|
|
|
|
type RangeChecksumParams struct {
|
|
typ checksumType
|
|
|
|
addr *object.Address
|
|
|
|
rs []*object.Range
|
|
|
|
salt []byte
|
|
}
|
|
|
|
type SearchObjectParams struct {
|
|
cid *container.ID
|
|
|
|
filters object.SearchFilters
|
|
}
|
|
|
|
type putObjectV2Writer struct {
|
|
key *ecdsa.PrivateKey
|
|
|
|
chunkPart *v2object.PutObjectPartChunk
|
|
|
|
req *v2object.PutRequest
|
|
|
|
stream v2object.PutObjectStreamer
|
|
}
|
|
|
|
type checksumType int
|
|
|
|
const (
|
|
_ checksumType = iota
|
|
checksumSHA256
|
|
checksumTZ
|
|
)
|
|
|
|
const chunkSize = 3 * (1 << 20)
|
|
|
|
const TZSize = 64
|
|
|
|
const searchQueryVersion uint32 = 1
|
|
|
|
func rangesToV2(rs []*object.Range) []*v2object.Range {
|
|
r2 := make([]*v2object.Range, 0, len(rs))
|
|
|
|
for i := range rs {
|
|
r2 = append(r2, rs[i].ToV2())
|
|
}
|
|
|
|
return r2
|
|
}
|
|
|
|
func (t checksumType) toV2() v2refs.ChecksumType {
|
|
switch t {
|
|
case checksumSHA256:
|
|
return v2refs.SHA256
|
|
case checksumTZ:
|
|
return v2refs.TillichZemor
|
|
default:
|
|
panic(fmt.Sprintf("invalid checksum type %d", t))
|
|
}
|
|
}
|
|
|
|
func (w *putObjectV2Writer) Write(p []byte) (int, error) {
|
|
w.chunkPart.SetChunk(p)
|
|
|
|
w.req.SetVerificationHeader(nil)
|
|
|
|
if err := signature.SignServiceMessage(w.key, w.req); err != nil {
|
|
return 0, errors.Wrap(err, "could not sign chunk request message")
|
|
}
|
|
|
|
if err := w.stream.Send(w.req); err != nil {
|
|
return 0, errors.Wrap(err, "could not send chunk request message")
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
func (p *PutObjectParams) WithObject(v *object.Object) *PutObjectParams {
|
|
if p != nil {
|
|
p.obj = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *PutObjectParams) WithPayloadReader(v io.Reader) *PutObjectParams {
|
|
if p != nil {
|
|
p.r = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) PutObject(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.putObjectV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) putObjectV2(ctx context.Context, p *PutObjectParams, opts ...CallOption) (*object.ID, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
stream, err := cli.Put(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not open Put object stream")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.PutRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.PutRequestBody)
|
|
req.SetBody(body)
|
|
|
|
v2Addr := new(v2refs.Address)
|
|
v2Addr.SetObjectID(p.obj.GetID().ToV2())
|
|
v2Addr.SetContainerID(p.obj.GetContainerID().ToV2())
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: v2Addr,
|
|
verb: v2session.ObjectVerbPut,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// initialize init part
|
|
initPart := new(v2object.PutObjectPartInit)
|
|
body.SetObjectPart(initPart)
|
|
|
|
obj := p.obj.ToV2()
|
|
|
|
// set init part fields
|
|
initPart.SetObjectID(obj.GetObjectID())
|
|
initPart.SetSignature(obj.GetSignature())
|
|
initPart.SetHeader(obj.GetHeader())
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// send init part
|
|
if err := stream.Send(req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not send %T", req)
|
|
}
|
|
|
|
// create payload bytes reader
|
|
var rPayload io.Reader = bytes.NewReader(obj.GetPayload())
|
|
if p.r != nil {
|
|
rPayload = io.MultiReader(rPayload, p.r)
|
|
}
|
|
|
|
// create v2 payload stream writer
|
|
chunkPart := new(v2object.PutObjectPartChunk)
|
|
body.SetObjectPart(chunkPart)
|
|
|
|
w := &putObjectV2Writer{
|
|
key: c.key,
|
|
chunkPart: chunkPart,
|
|
req: req,
|
|
stream: stream,
|
|
}
|
|
|
|
// copy payload from reader to stream writer
|
|
_, err = io.CopyBuffer(w, rPayload, make([]byte, chunkSize))
|
|
if err != nil && !errors.Is(errors.Cause(err), io.EOF) {
|
|
return nil, errors.Wrap(err, "could not send payload bytes to Put object stream")
|
|
}
|
|
|
|
// close object stream and receive response from remote node
|
|
resp, err := stream.CloseAndRecv()
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not close %T", stream)
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
// convert object identifier
|
|
id := object.NewIDFromV2(resp.GetBody().GetObjectID())
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (p *DeleteObjectParams) WithAddress(v *object.Address) *DeleteObjectParams {
|
|
if p != nil {
|
|
p.addr = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.deleteObjectV2(ctx, p, opts...)
|
|
default:
|
|
return unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) deleteObjectV2(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) error {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.DeleteRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.DeleteRequestBody)
|
|
req.SetBody(body)
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: p.addr.ToV2(),
|
|
verb: v2session.ObjectVerbDelete,
|
|
}); err != nil {
|
|
return errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetAddress(p.addr.ToV2())
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// send request
|
|
resp, err := cli.Delete(ctx, req)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "could not send %T", req)
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *GetObjectParams) WithAddress(v *object.Address) *GetObjectParams {
|
|
if p != nil {
|
|
p.addr = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *GetObjectParams) WithPayloadWriter(w io.Writer) *GetObjectParams {
|
|
if p != nil {
|
|
p.w = w
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) GetObject(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.getObjectV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) getObjectV2(ctx context.Context, p *GetObjectParams, opts ...CallOption) (*object.Object, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.GetRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.GetRequestBody)
|
|
req.SetBody(body)
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: p.addr.ToV2(),
|
|
verb: v2session.ObjectVerbGet,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetAddress(p.addr.ToV2())
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// create Get object stream
|
|
stream, err := cli.Get(ctx, req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Get object stream")
|
|
}
|
|
|
|
var (
|
|
payload []byte
|
|
obj = new(v2object.Object)
|
|
)
|
|
|
|
for {
|
|
// receive message from server stream
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(errors.Cause(err), io.EOF) {
|
|
break
|
|
}
|
|
|
|
return nil, errors.Wrap(err, "could not receive Get response")
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
switch v := resp.GetBody().GetObjectPart().(type) {
|
|
case nil:
|
|
return nil, errors.New("received nil object part")
|
|
case *v2object.GetObjectPartInit:
|
|
obj.SetObjectID(v.GetObjectID())
|
|
obj.SetSignature(v.GetSignature())
|
|
|
|
hdr := v.GetHeader()
|
|
obj.SetHeader(hdr)
|
|
|
|
if p.w == nil {
|
|
payload = make([]byte, 0, hdr.GetPayloadLength())
|
|
}
|
|
case *v2object.GetObjectPartChunk:
|
|
if p.w != nil {
|
|
if _, err := p.w.Write(v.GetChunk()); err != nil {
|
|
return nil, errors.Wrap(err, "could not write payload chunk")
|
|
}
|
|
} else {
|
|
payload = append(payload, v.GetChunk()...)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("unexpected Get object part type %T", v))
|
|
}
|
|
}
|
|
|
|
obj.SetPayload(payload)
|
|
|
|
// convert the object
|
|
return object.NewFromV2(obj), nil
|
|
}
|
|
|
|
func (p *ObjectHeaderParams) WithAddress(v *object.Address) *ObjectHeaderParams {
|
|
if p != nil {
|
|
p.addr = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *ObjectHeaderParams) WithAllFields() *ObjectHeaderParams {
|
|
if p != nil {
|
|
p.short = false
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *ObjectHeaderParams) WithMainFields() *ObjectHeaderParams {
|
|
if p != nil {
|
|
p.short = true
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) GetObjectHeader(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.getObjectHeaderV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) getObjectHeaderV2(ctx context.Context, p *ObjectHeaderParams, opts ...CallOption) (*object.Object, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.HeadRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.HeadRequestBody)
|
|
req.SetBody(body)
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: p.addr.ToV2(),
|
|
verb: v2session.ObjectVerbHead,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetAddress(p.addr.ToV2())
|
|
body.SetMainOnly(p.short)
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// send Head request
|
|
resp, err := cli.Head(ctx, req)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not send %T", req)
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
var hdr *v2object.Header
|
|
|
|
switch v := resp.GetBody().GetHeaderPart().(type) {
|
|
case nil:
|
|
return nil, errors.New("received nil object header part")
|
|
case *v2object.GetHeaderPartShort:
|
|
if !p.short {
|
|
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
|
(*v2object.GetHeaderPartFull)(nil), (*v2object.GetHeaderPartShort)(nil),
|
|
)
|
|
}
|
|
|
|
h := v.GetShortHeader()
|
|
|
|
hdr = new(v2object.Header)
|
|
hdr.SetPayloadLength(h.GetPayloadLength())
|
|
hdr.SetVersion(h.GetVersion())
|
|
hdr.SetOwnerID(h.GetOwnerID())
|
|
hdr.SetObjectType(h.GetObjectType())
|
|
hdr.SetCreationEpoch(h.GetCreationEpoch())
|
|
case *v2object.GetHeaderPartFull:
|
|
if p.short {
|
|
return nil, errors.Errorf("wrong header part type: expected %T, received %T",
|
|
(*v2object.GetHeaderPartShort)(nil), (*v2object.GetHeaderPartFull)(nil),
|
|
)
|
|
}
|
|
|
|
hdrWithSig := v.GetHeaderWithSignature()
|
|
if hdrWithSig == nil {
|
|
return nil, errors.New("got nil instead of header with signature")
|
|
}
|
|
hdr = hdrWithSig.GetHeader()
|
|
// todo: check signature there
|
|
default:
|
|
panic(fmt.Sprintf("unexpected Head object type %T", v))
|
|
}
|
|
|
|
obj := new(v2object.Object)
|
|
obj.SetHeader(hdr)
|
|
|
|
// convert the object
|
|
return object.NewFromV2(obj), nil
|
|
}
|
|
|
|
func (p *RangeDataParams) WithAddress(v *object.Address) *RangeDataParams {
|
|
if p != nil {
|
|
p.addr = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *RangeDataParams) WithRange(v *object.Range) *RangeDataParams {
|
|
if p != nil {
|
|
p.r = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams {
|
|
if p != nil {
|
|
p.w = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.objectPayloadRangeV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) objectPayloadRangeV2(ctx context.Context, p *RangeDataParams, opts ...CallOption) ([]byte, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.GetRangeRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.GetRangeRequestBody)
|
|
req.SetBody(body)
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: p.addr.ToV2(),
|
|
verb: v2session.ObjectVerbRange,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetAddress(p.addr.ToV2())
|
|
body.SetRange(p.r.ToV2())
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// create Get payload range stream
|
|
stream, err := cli.GetRange(ctx, req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Get payload range stream")
|
|
}
|
|
|
|
var payload []byte
|
|
if p.w != nil {
|
|
payload = make([]byte, p.r.GetLength())
|
|
}
|
|
|
|
for {
|
|
// receive message from server stream
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(errors.Cause(err), io.EOF) {
|
|
break
|
|
}
|
|
|
|
return nil, errors.Wrap(err, "could not receive Get payload range response")
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
chunk := resp.GetBody().GetChunk()
|
|
|
|
if p.w != nil {
|
|
if _, err := p.w.Write(chunk); err != nil {
|
|
return nil, errors.Wrap(err, "could not write payload chunk")
|
|
}
|
|
} else {
|
|
payload = append(payload, chunk...)
|
|
}
|
|
}
|
|
|
|
return payload, nil
|
|
}
|
|
|
|
func (p *RangeChecksumParams) WithAddress(v *object.Address) *RangeChecksumParams {
|
|
if p != nil {
|
|
p.addr = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *RangeChecksumParams) WithRangeList(rs ...*object.Range) *RangeChecksumParams {
|
|
if p != nil {
|
|
p.rs = rs
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams {
|
|
if p != nil {
|
|
p.salt = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *RangeChecksumParams) withChecksumType(t checksumType) *RangeChecksumParams {
|
|
if p != nil {
|
|
p.typ = t
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) ObjectPayloadRangeSHA256(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][sha256.Size]byte, error) {
|
|
res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumSHA256), opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res.([][sha256.Size]byte), nil
|
|
}
|
|
|
|
func (c *Client) ObjectPayloadRangeTZ(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) ([][TZSize]byte, error) {
|
|
res, err := c.objectPayloadRangeHash(ctx, p.withChecksumType(checksumTZ), opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return res.([][TZSize]byte), nil
|
|
}
|
|
|
|
func (c *Client) objectPayloadRangeHash(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.objectPayloadRangeHashV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) objectPayloadRangeHashV2(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (interface{}, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.GetRangeHashRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.GetRangeHashRequestBody)
|
|
req.SetBody(body)
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: p.addr.ToV2(),
|
|
verb: v2session.ObjectVerbRangeHash,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetAddress(p.addr.ToV2())
|
|
body.SetSalt(p.salt)
|
|
|
|
typV2 := p.typ.toV2()
|
|
body.SetType(typV2)
|
|
|
|
rsV2 := rangesToV2(p.rs)
|
|
body.SetRanges(rsV2)
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// send request
|
|
resp, err := cli.GetRangeHash(ctx, req)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not send %T", req)
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
respBody := resp.GetBody()
|
|
respType := respBody.GetType()
|
|
respHashes := respBody.GetHashList()
|
|
|
|
if t := p.typ.toV2(); respType != t {
|
|
return nil, errors.Errorf("invalid checksum type: expected %v, received %v", t, respType)
|
|
} else if reqLn, respLn := len(rsV2), len(respHashes); reqLn != respLn {
|
|
return nil, errors.Errorf("wrong checksum number: expected %d, received %d", reqLn, respLn)
|
|
}
|
|
|
|
var res interface{}
|
|
|
|
switch p.typ {
|
|
case checksumSHA256:
|
|
r := make([][sha256.Size]byte, 0, len(respHashes))
|
|
|
|
for i := range respHashes {
|
|
if ln := len(respHashes[i]); ln != sha256.Size {
|
|
return nil, errors.Errorf("invalid checksum length: expected %d, received %d", sha256.Size, ln)
|
|
}
|
|
|
|
cs := [sha256.Size]byte{}
|
|
copy(cs[:], respHashes[i])
|
|
|
|
r = append(r, cs)
|
|
}
|
|
|
|
res = r
|
|
case checksumTZ:
|
|
r := make([][TZSize]byte, 0, len(respHashes))
|
|
|
|
for i := range respHashes {
|
|
if ln := len(respHashes[i]); ln != TZSize {
|
|
return nil, errors.Errorf("invalid checksum length: expected %d, received %d", TZSize, ln)
|
|
}
|
|
|
|
cs := [TZSize]byte{}
|
|
copy(cs[:], respHashes[i])
|
|
|
|
r = append(r, cs)
|
|
}
|
|
|
|
res = r
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (p *SearchObjectParams) WithContainerID(v *container.ID) *SearchObjectParams {
|
|
if p != nil {
|
|
p.cid = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams {
|
|
if p != nil {
|
|
p.filters = v
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (c *Client) SearchObject(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
|
// check remote node version
|
|
switch c.remoteNode.Version.GetMajor() {
|
|
case 2:
|
|
return c.searchObjectV2(ctx, p, opts...)
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c *Client) searchObjectV2(ctx context.Context, p *SearchObjectParams, opts ...CallOption) ([]*object.ID, error) {
|
|
// create V2 Object client
|
|
cli, err := v2ObjectClient(c.remoteNode.Protocol, c.opts)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create Object V2 client")
|
|
}
|
|
|
|
callOpts := defaultCallOptions()
|
|
|
|
for i := range opts {
|
|
if opts[i] != nil {
|
|
opts[i].apply(&callOpts)
|
|
}
|
|
}
|
|
|
|
// create request
|
|
req := new(v2object.SearchRequest)
|
|
|
|
// initialize request body
|
|
body := new(v2object.SearchRequestBody)
|
|
req.SetBody(body)
|
|
|
|
v2Addr := new(v2refs.Address)
|
|
v2Addr.SetContainerID(p.cid.ToV2())
|
|
|
|
// set meta header
|
|
meta := v2MetaHeaderFromOpts(callOpts)
|
|
if err = c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
|
addr: v2Addr,
|
|
verb: v2session.ObjectVerbSearch,
|
|
}); err != nil {
|
|
return nil, errors.Wrap(err, "could not sign session token")
|
|
}
|
|
req.SetMetaHeader(meta)
|
|
|
|
// fill body fields
|
|
body.SetContainerID(v2Addr.GetContainerID())
|
|
body.SetVersion(searchQueryVersion)
|
|
body.SetFilters(p.filters.ToV2())
|
|
|
|
// sign the request
|
|
if err := signature.SignServiceMessage(c.key, req); err != nil {
|
|
return nil, errors.Wrapf(err, "could not sign %T", req)
|
|
}
|
|
|
|
// create search stream
|
|
stream, err := cli.Search(ctx, req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "could not create search stream")
|
|
}
|
|
|
|
var searchResult []*object.ID
|
|
|
|
for {
|
|
// receive message from server stream
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if errors.Is(errors.Cause(err), io.EOF) {
|
|
break
|
|
}
|
|
|
|
return nil, errors.Wrap(err, "could not receive search response")
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return nil, errors.Wrapf(err, "could not verify %T", resp)
|
|
}
|
|
|
|
chunk := resp.GetBody().GetIDList()
|
|
for i := range chunk {
|
|
searchResult = append(searchResult, object.NewIDFromV2(chunk[i]))
|
|
}
|
|
}
|
|
|
|
return searchResult, nil
|
|
}
|
|
|
|
func v2ObjectClient(proto TransportProtocol, opts *clientOptions) (*v2object.Client, error) {
|
|
switch proto {
|
|
case GRPC:
|
|
var err error
|
|
|
|
if opts.grpcOpts.objectClientV2 == nil {
|
|
var optsV2 []v2object.Option
|
|
|
|
if opts.grpcOpts.conn != nil {
|
|
optsV2 = []v2object.Option{
|
|
v2object.WithGlobalOpts(
|
|
client.WithGRPCConn(opts.grpcOpts.conn),
|
|
),
|
|
}
|
|
} else {
|
|
optsV2 = []v2object.Option{
|
|
v2object.WithGlobalOpts(
|
|
client.WithNetworkAddress(opts.addr),
|
|
),
|
|
}
|
|
}
|
|
|
|
opts.grpcOpts.objectClientV2, err = v2object.NewClient(optsV2...)
|
|
}
|
|
|
|
return opts.grpcOpts.objectClientV2, err
|
|
default:
|
|
return nil, unsupportedProtocolErr
|
|
}
|
|
}
|
|
|
|
func (c Client) attachV2SessionToken(opts callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
|
|
if opts.session == nil {
|
|
return nil
|
|
}
|
|
|
|
token := opts.session.ToV2()
|
|
|
|
opCtx := new(v2session.ObjectSessionContext)
|
|
opCtx.SetAddress(info.addr)
|
|
opCtx.SetVerb(info.verb)
|
|
|
|
lt := new(v2session.TokenLifetime)
|
|
lt.SetIat(info.iat)
|
|
lt.SetNbf(info.nbf)
|
|
lt.SetExp(info.exp)
|
|
|
|
body := token.GetBody()
|
|
body.SetSessionKey(opts.session.SessionKey())
|
|
body.SetContext(opCtx)
|
|
body.SetLifetime(lt)
|
|
|
|
signWrapper := signature.StableMarshalerWrapper{SM: token.GetBody()}
|
|
err := signer.SignDataWithHandler(c.key, signWrapper, func(key []byte, sig []byte) {
|
|
sessionTokenSignature := new(v2refs.Signature)
|
|
sessionTokenSignature.SetKey(key)
|
|
sessionTokenSignature.SetSign(sig)
|
|
token.SetSignature(sessionTokenSignature)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hdr.SetSessionToken(token)
|
|
|
|
return nil
|
|
}
|