forked from TrueCloudLab/frostfs-sdk-go
[#131] client: Remove no longer needed code
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
1130c1f5a6
commit
d90fe8fbab
6 changed files with 12 additions and 979 deletions
|
@ -33,65 +33,6 @@ func (x statusRes) Status() apistatus.Status {
|
|||
return x.st
|
||||
}
|
||||
|
||||
// checks response signature and write client error if it is not correct (in this case returns true).
|
||||
func isInvalidSignatureV2(res *processResponseV2Res, resp responseV2) bool {
|
||||
err := signature.VerifyServiceMessage(resp)
|
||||
|
||||
isErr := err != nil
|
||||
if isErr {
|
||||
res.cliErr = fmt.Errorf("invalid response signature: %w", err)
|
||||
}
|
||||
|
||||
return isErr
|
||||
}
|
||||
|
||||
type processResponseV2Prm struct {
|
||||
callOpts *callOptions
|
||||
|
||||
resp responseV2
|
||||
}
|
||||
|
||||
type processResponseV2Res struct {
|
||||
statusRes resCommon
|
||||
|
||||
cliErr error
|
||||
}
|
||||
|
||||
// performs common actions of response processing and writes any problem as a result status or client error
|
||||
// (in both cases returns true).
|
||||
//
|
||||
// Actions:
|
||||
// * verify signature (internal);
|
||||
// * call response callback (internal);
|
||||
// * unwrap status error (optional).
|
||||
func (c *Client) processResponseV2(res *processResponseV2Res, prm processResponseV2Prm) bool {
|
||||
// verify response structure
|
||||
if isInvalidSignatureV2(res, prm.resp) {
|
||||
return true
|
||||
}
|
||||
|
||||
// handle response meta info
|
||||
if err := c.handleResponseInfoV2(prm.callOpts, prm.resp); err != nil {
|
||||
res.cliErr = err
|
||||
return true
|
||||
}
|
||||
|
||||
// get result status
|
||||
st := apistatus.FromStatusV2(prm.resp.GetMetaHeader().GetStatus())
|
||||
|
||||
// unwrap unsuccessful status and return it
|
||||
// as error if client has been configured so
|
||||
unsuccessfulStatus := !apistatus.IsSuccessful(st)
|
||||
if unsuccessfulStatus && c.opts.parseNeoFSErrors {
|
||||
res.cliErr = apistatus.ErrFromStatus(st)
|
||||
return true
|
||||
}
|
||||
|
||||
res.statusRes.setStatus(st)
|
||||
|
||||
return unsuccessfulStatus
|
||||
}
|
||||
|
||||
type prmSession struct {
|
||||
tokenSessionSet bool
|
||||
tokenSession session.Token
|
||||
|
|
794
client/object.go
794
client/object.go
|
@ -1,794 +0,0 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
v2refs "github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
rpcapi "github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
|
||||
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
signer "github.com/nspcc-dev/neofs-sdk-go/util/signature"
|
||||
)
|
||||
|
||||
// ObjectAddressWriter is an interface of the
|
||||
// component that writes the object address.
|
||||
type ObjectAddressWriter interface {
|
||||
SetAddress(*address.Address)
|
||||
}
|
||||
|
||||
type DeleteObjectParams struct {
|
||||
addr *address.Address
|
||||
|
||||
tombTgt ObjectAddressWriter
|
||||
}
|
||||
|
||||
type RangeDataParams struct {
|
||||
addr *address.Address
|
||||
|
||||
raw bool
|
||||
|
||||
r *object.Range
|
||||
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
type RangeChecksumParams struct {
|
||||
tz bool
|
||||
|
||||
addr *address.Address
|
||||
|
||||
rs []*object.Range
|
||||
|
||||
salt []byte
|
||||
}
|
||||
|
||||
type SearchObjectParams struct {
|
||||
cid *cid.ID
|
||||
|
||||
filters object.SearchFilters
|
||||
}
|
||||
|
||||
type checksumType int
|
||||
|
||||
const (
|
||||
_ checksumType = iota
|
||||
checksumSHA256
|
||||
checksumTZ
|
||||
)
|
||||
|
||||
const TZSize = 64
|
||||
|
||||
const searchQueryVersion uint32 = 1
|
||||
|
||||
func rangesToV2(rs []*object.Range) []*v2object.Range {
|
||||
r2 := make([]*v2object.Range, 0, len(rs))
|
||||
|
||||
for i := range rs {
|
||||
r2 = append(r2, rs[i].ToV2())
|
||||
}
|
||||
|
||||
return r2
|
||||
}
|
||||
|
||||
func (t checksumType) toV2() v2refs.ChecksumType {
|
||||
switch t {
|
||||
case checksumSHA256:
|
||||
return v2refs.SHA256
|
||||
case checksumTZ:
|
||||
return v2refs.TillichZemor
|
||||
default:
|
||||
panic(fmt.Sprintf("invalid checksum type %d", t))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DeleteObjectParams) WithAddress(v *address.Address) *DeleteObjectParams {
|
||||
if p != nil {
|
||||
p.addr = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *DeleteObjectParams) Address() *address.Address {
|
||||
if p != nil {
|
||||
return p.addr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithTombstoneAddressTarget sets target component to write tombstone address.
|
||||
func (p *DeleteObjectParams) WithTombstoneAddressTarget(v ObjectAddressWriter) *DeleteObjectParams {
|
||||
if p != nil {
|
||||
p.tombTgt = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// TombstoneAddressTarget returns target component to write tombstone address.
|
||||
func (p *DeleteObjectParams) TombstoneAddressTarget() ObjectAddressWriter {
|
||||
if p != nil {
|
||||
return p.tombTgt
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ObjectDeleteRes struct {
|
||||
statusRes
|
||||
|
||||
tombAddr *address.Address
|
||||
}
|
||||
|
||||
func (x ObjectDeleteRes) TombstoneAddress() *address.Address {
|
||||
return x.tombAddr
|
||||
}
|
||||
|
||||
func (x *ObjectDeleteRes) setTombstoneAddress(addr *address.Address) {
|
||||
x.tombAddr = addr
|
||||
}
|
||||
|
||||
// DeleteObject removes object by address.
|
||||
//
|
||||
// If target of tombstone address is not set, the address is ignored.
|
||||
//
|
||||
// Any client's internal or transport errors are returned as `error`.
|
||||
// If WithNeoFSErrorParsing option has been provided, unsuccessful
|
||||
// NeoFS status codes are returned as `error`, otherwise, are included
|
||||
// in the returned result structure.
|
||||
func (c *Client) DeleteObject(ctx context.Context, p *DeleteObjectParams, opts ...CallOption) (*ObjectDeleteRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
// create request
|
||||
req := new(v2object.DeleteRequest)
|
||||
|
||||
// initialize request body
|
||||
body := new(v2object.DeleteRequestBody)
|
||||
req.SetBody(body)
|
||||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbDelete,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("could not attach session token: %w", err)
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
||||
// fill body fields
|
||||
body.SetAddress(p.addr.ToV2())
|
||||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, fmt.Errorf("signing the request failed: %w", err)
|
||||
}
|
||||
|
||||
// send request
|
||||
resp, err := rpcapi.DeleteObject(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
res = new(ObjectDeleteRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
addrv2 := resp.GetBody().GetTombstone()
|
||||
|
||||
res.setTombstoneAddress(address.NewAddressFromV2(addrv2))
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
type ObjectGetRes struct {
|
||||
statusRes
|
||||
objectRes
|
||||
}
|
||||
|
||||
type objectRes struct {
|
||||
obj *object.Object
|
||||
}
|
||||
|
||||
func (x *objectRes) setObject(obj *object.Object) {
|
||||
x.obj = obj
|
||||
}
|
||||
|
||||
func (x objectRes) Object() *object.Object {
|
||||
return x.obj
|
||||
}
|
||||
|
||||
func writeUnexpectedMessageTypeErr(res resCommon, val interface{}) {
|
||||
var st apistatus.ServerInternal // specific API status should be used
|
||||
|
||||
apistatus.WriteInternalServerErr(&st, fmt.Errorf("unexpected message type %T", val))
|
||||
|
||||
res.setStatus(st)
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) WithAddress(v *address.Address) *RangeDataParams {
|
||||
if p != nil {
|
||||
p.addr = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) Address() *address.Address {
|
||||
if p != nil {
|
||||
return p.addr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) WithRaw(v bool) *RangeDataParams {
|
||||
if p != nil {
|
||||
p.raw = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) Raw() bool {
|
||||
if p != nil {
|
||||
return p.raw
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) WithRange(v *object.Range) *RangeDataParams {
|
||||
if p != nil {
|
||||
p.r = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) Range() *object.Range {
|
||||
if p != nil {
|
||||
return p.r
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) WithDataWriter(v io.Writer) *RangeDataParams {
|
||||
if p != nil {
|
||||
p.w = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeDataParams) DataWriter() io.Writer {
|
||||
if p != nil {
|
||||
return p.w
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ObjectRangeRes struct {
|
||||
statusRes
|
||||
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (x *ObjectRangeRes) setData(data []byte) {
|
||||
x.data = data
|
||||
}
|
||||
|
||||
func (x ObjectRangeRes) Data() []byte {
|
||||
return x.data
|
||||
}
|
||||
|
||||
// ObjectPayloadRangeData receives object's range payload data through NeoFS API call.
|
||||
//
|
||||
// Any client's internal or transport errors are returned as `error`.
|
||||
// If WithNeoFSErrorParsing option has been provided, unsuccessful
|
||||
// NeoFS status codes are returned as `error`, otherwise, are included
|
||||
// in the returned result structure.
|
||||
func (c *Client) ObjectPayloadRangeData(ctx context.Context, p *RangeDataParams, opts ...CallOption) (*ObjectRangeRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
// create request
|
||||
req := new(v2object.GetRangeRequest)
|
||||
|
||||
// initialize request body
|
||||
body := new(v2object.GetRangeRequestBody)
|
||||
req.SetBody(body)
|
||||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbRange,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("could not attach session token: %w", err)
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
||||
// fill body fields
|
||||
body.SetAddress(p.addr.ToV2())
|
||||
body.SetRange(p.r.ToV2())
|
||||
body.SetRaw(p.raw)
|
||||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, fmt.Errorf("signing the request failed: %w", err)
|
||||
}
|
||||
|
||||
// open stream
|
||||
stream, err := rpcapi.GetObjectRange(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
}
|
||||
|
||||
var payload []byte
|
||||
if p.w != nil {
|
||||
payload = make([]byte, 0, p.r.GetLength())
|
||||
}
|
||||
|
||||
var (
|
||||
resp = new(v2object.GetRangeResponse)
|
||||
|
||||
chunkWas, messageWas bool
|
||||
|
||||
res = new(ObjectRangeRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !messageWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
messageWas = true
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
writeUnexpectedMessageTypeErr(res, v)
|
||||
return res, nil
|
||||
case *v2object.GetRangePartChunk:
|
||||
chunkWas = true
|
||||
|
||||
if p.w != nil {
|
||||
if _, err = p.w.Write(v.GetChunk()); err != nil {
|
||||
return nil, fmt.Errorf("could not write payload chunk: %w", err)
|
||||
}
|
||||
} else {
|
||||
payload = append(payload, v.GetChunk()...)
|
||||
}
|
||||
case *v2object.SplitInfo:
|
||||
if chunkWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
si := object.NewSplitInfoFromV2(v)
|
||||
|
||||
return nil, object.NewSplitInfoError(si)
|
||||
}
|
||||
}
|
||||
|
||||
res.setData(payload)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) WithAddress(v *address.Address) *RangeChecksumParams {
|
||||
if p != nil {
|
||||
p.addr = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) Address() *address.Address {
|
||||
if p != nil {
|
||||
return p.addr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) WithRangeList(rs ...*object.Range) *RangeChecksumParams {
|
||||
if p != nil {
|
||||
p.rs = rs
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) RangeList() []*object.Range {
|
||||
if p != nil {
|
||||
return p.rs
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) WithSalt(v []byte) *RangeChecksumParams {
|
||||
if p != nil {
|
||||
p.salt = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) Salt() []byte {
|
||||
if p != nil {
|
||||
return p.salt
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *RangeChecksumParams) TZ() *RangeChecksumParams {
|
||||
p.tz = true
|
||||
return p
|
||||
}
|
||||
|
||||
type ObjectRangeHashRes struct {
|
||||
statusRes
|
||||
|
||||
hashes [][]byte
|
||||
}
|
||||
|
||||
func (x *ObjectRangeHashRes) setHashes(v [][]byte) {
|
||||
x.hashes = v
|
||||
}
|
||||
|
||||
func (x ObjectRangeHashRes) Hashes() [][]byte {
|
||||
return x.hashes
|
||||
}
|
||||
|
||||
// HashObjectPayloadRanges receives range hash of the object
|
||||
// payload data through NeoFS API call.
|
||||
//
|
||||
// Any client's internal or transport errors are returned as `error`.
|
||||
// If WithNeoFSErrorParsing option has been provided, unsuccessful
|
||||
// NeoFS status codes are returned as `error`, otherwise, are included
|
||||
// in the returned result structure.
|
||||
func (c *Client) HashObjectPayloadRanges(ctx context.Context, p *RangeChecksumParams, opts ...CallOption) (*ObjectRangeHashRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
// create request
|
||||
req := new(v2object.GetRangeHashRequest)
|
||||
|
||||
// initialize request body
|
||||
body := new(v2object.GetRangeHashRequestBody)
|
||||
req.SetBody(body)
|
||||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: p.addr.ToV2(),
|
||||
verb: v2session.ObjectVerbRangeHash,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("could not attach session token: %w", err)
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
||||
// fill body fields
|
||||
body.SetAddress(p.addr.ToV2())
|
||||
body.SetSalt(p.salt)
|
||||
|
||||
typ := checksumSHA256
|
||||
if p.tz {
|
||||
typ = checksumTZ
|
||||
}
|
||||
|
||||
typV2 := typ.toV2()
|
||||
body.SetType(typV2)
|
||||
|
||||
rsV2 := rangesToV2(p.rs)
|
||||
body.SetRanges(rsV2)
|
||||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, fmt.Errorf("signing the request failed: %w", err)
|
||||
}
|
||||
|
||||
// send request
|
||||
resp, err := rpcapi.HashObjectRange(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
res = new(ObjectRangeHashRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
res.setHashes(resp.GetBody().GetHashList())
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (p *SearchObjectParams) WithContainerID(v *cid.ID) *SearchObjectParams {
|
||||
if p != nil {
|
||||
p.cid = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *SearchObjectParams) ContainerID() *cid.ID {
|
||||
if p != nil {
|
||||
return p.cid
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SearchObjectParams) WithSearchFilters(v object.SearchFilters) *SearchObjectParams {
|
||||
if p != nil {
|
||||
p.filters = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *SearchObjectParams) SearchFilters() object.SearchFilters {
|
||||
if p != nil {
|
||||
return p.filters
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ObjectSearchRes struct {
|
||||
statusRes
|
||||
|
||||
ids []*oid.ID
|
||||
}
|
||||
|
||||
func (x *ObjectSearchRes) setIDList(v []*oid.ID) {
|
||||
x.ids = v
|
||||
}
|
||||
|
||||
func (x ObjectSearchRes) IDList() []*oid.ID {
|
||||
return x.ids
|
||||
}
|
||||
|
||||
// SearchObjects searches for the objects through NeoFS API call.
|
||||
//
|
||||
// Any client's internal or transport errors are returned as `error`.
|
||||
// If WithNeoFSErrorParsing option has been provided, unsuccessful
|
||||
// NeoFS status codes are returned as `error`, otherwise, are included
|
||||
// in the returned result structure.
|
||||
func (c *Client) SearchObjects(ctx context.Context, p *SearchObjectParams, opts ...CallOption) (*ObjectSearchRes, error) {
|
||||
callOpts := c.defaultCallOptions()
|
||||
|
||||
for i := range opts {
|
||||
if opts[i] != nil {
|
||||
opts[i](callOpts)
|
||||
}
|
||||
}
|
||||
|
||||
// create request
|
||||
req := new(v2object.SearchRequest)
|
||||
|
||||
// initialize request body
|
||||
body := new(v2object.SearchRequestBody)
|
||||
req.SetBody(body)
|
||||
|
||||
v2Addr := new(v2refs.Address)
|
||||
v2Addr.SetContainerID(p.cid.ToV2())
|
||||
|
||||
// set meta header
|
||||
meta := v2MetaHeaderFromOpts(callOpts)
|
||||
|
||||
if err := c.attachV2SessionToken(callOpts, meta, v2SessionReqInfo{
|
||||
addr: v2Addr,
|
||||
verb: v2session.ObjectVerbSearch,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("could not attach session token: %w", err)
|
||||
}
|
||||
|
||||
req.SetMetaHeader(meta)
|
||||
|
||||
// fill body fields
|
||||
body.SetContainerID(v2Addr.GetContainerID())
|
||||
body.SetVersion(searchQueryVersion)
|
||||
body.SetFilters(p.filters.ToV2())
|
||||
|
||||
// sign the request
|
||||
if err := signature.SignServiceMessage(callOpts.key, req); err != nil {
|
||||
return nil, fmt.Errorf("signing the request failed: %w", err)
|
||||
}
|
||||
|
||||
// create search stream
|
||||
stream, err := rpcapi.SearchObjects(c.Raw(), req, client.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
searchResult []*oid.ID
|
||||
resp = new(v2object.SearchResponse)
|
||||
|
||||
messageWas bool
|
||||
|
||||
res = new(ObjectSearchRes)
|
||||
procPrm processResponseV2Prm
|
||||
procRes processResponseV2Res
|
||||
)
|
||||
|
||||
procPrm.callOpts = callOpts
|
||||
procPrm.resp = resp
|
||||
|
||||
procRes.statusRes = res
|
||||
|
||||
for {
|
||||
// receive message from server stream
|
||||
err := stream.Read(resp)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
if !messageWas {
|
||||
return nil, errWrongMessageSeq
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
messageWas = true
|
||||
|
||||
// process response in general
|
||||
if c.processResponseV2(&procRes, procPrm) {
|
||||
if procRes.cliErr != nil {
|
||||
return nil, procRes.cliErr
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
chunk := resp.GetBody().GetIDList()
|
||||
for i := range chunk {
|
||||
searchResult = append(searchResult, oid.NewIDFromV2(chunk[i]))
|
||||
}
|
||||
}
|
||||
|
||||
res.setIDList(searchResult)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (c *Client) attachV2SessionToken(opts *callOptions, hdr *v2session.RequestMetaHeader, info v2SessionReqInfo) error {
|
||||
if opts.session == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Do not resign already prepared session token
|
||||
if opts.session.Signature() != nil {
|
||||
hdr.SetSessionToken(opts.session.ToV2())
|
||||
return nil
|
||||
}
|
||||
|
||||
opCtx := new(v2session.ObjectSessionContext)
|
||||
opCtx.SetAddress(info.addr)
|
||||
opCtx.SetVerb(info.verb)
|
||||
|
||||
body := new(v2session.SessionTokenBody)
|
||||
body.SetID(opts.session.ID())
|
||||
body.SetOwnerID(opts.session.OwnerID().ToV2())
|
||||
body.SetSessionKey(opts.session.SessionKey())
|
||||
body.SetContext(opCtx)
|
||||
|
||||
token := new(v2session.SessionToken)
|
||||
token.SetBody(body)
|
||||
|
||||
signWrapper := signature.StableMarshalerWrapper{SM: token.GetBody()}
|
||||
|
||||
err := signer.SignDataWithHandler(opts.key, signWrapper, func(key []byte, sig []byte) {
|
||||
sessionTokenSignature := new(v2refs.Signature)
|
||||
sessionTokenSignature.SetKey(key)
|
||||
sessionTokenSignature.SetSign(sig)
|
||||
token.SetSignature(sessionTokenSignature)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hdr.SetSessionToken(token)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -5,33 +5,13 @@ import (
|
|||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
|
||||
v2session "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/token"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/version"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
CallOption func(*callOptions)
|
||||
|
||||
Option func(*clientOptions)
|
||||
|
||||
callOptions struct {
|
||||
version *version.Version
|
||||
xHeaders []*session.XHeader
|
||||
ttl uint32
|
||||
epoch uint64
|
||||
key *ecdsa.PrivateKey
|
||||
session *session.Token
|
||||
bearer *token.BearerToken
|
||||
// network magic is a client config, but it's convenient to copy it here (see v2MetaHeaderFromOpts)
|
||||
// the value remains the same between calls
|
||||
netMagic uint64
|
||||
}
|
||||
|
||||
clientOptions struct {
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
|
@ -47,84 +27,8 @@ type (
|
|||
|
||||
netMagic uint64
|
||||
}
|
||||
|
||||
v2SessionReqInfo struct {
|
||||
addr *refs.Address
|
||||
verb v2session.ObjectSessionVerb
|
||||
}
|
||||
)
|
||||
|
||||
func (c *Client) defaultCallOptions() *callOptions {
|
||||
return &callOptions{
|
||||
version: version.Current(),
|
||||
ttl: 2,
|
||||
key: c.opts.key,
|
||||
// copy client's static value is a bit overhead, but it is the easiest way at the time of feature intro
|
||||
netMagic: c.opts.netMagic,
|
||||
}
|
||||
}
|
||||
|
||||
func WithXHeader(x *session.XHeader) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.xHeaders = append(opts.xHeaders, x)
|
||||
}
|
||||
}
|
||||
|
||||
func WithTTL(ttl uint32) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.ttl = ttl
|
||||
}
|
||||
}
|
||||
|
||||
// WithKey sets client's key for the next request.
|
||||
func WithKey(key *ecdsa.PrivateKey) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.key = key
|
||||
}
|
||||
}
|
||||
|
||||
func WithEpoch(epoch uint64) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.epoch = epoch
|
||||
}
|
||||
}
|
||||
|
||||
func WithSession(token *session.Token) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.session = token
|
||||
}
|
||||
}
|
||||
|
||||
func WithBearer(token *token.BearerToken) CallOption {
|
||||
return func(opts *callOptions) {
|
||||
opts.bearer = token
|
||||
}
|
||||
}
|
||||
|
||||
func v2MetaHeaderFromOpts(options *callOptions) *v2session.RequestMetaHeader {
|
||||
meta := new(v2session.RequestMetaHeader)
|
||||
meta.SetVersion(options.version.ToV2())
|
||||
meta.SetTTL(options.ttl)
|
||||
meta.SetEpoch(options.epoch)
|
||||
|
||||
xhdrs := make([]*v2session.XHeader, len(options.xHeaders))
|
||||
for i := range options.xHeaders {
|
||||
xhdrs[i] = options.xHeaders[i].ToV2()
|
||||
}
|
||||
|
||||
meta.SetXHeaders(xhdrs)
|
||||
|
||||
if options.bearer != nil {
|
||||
meta.SetBearerToken(options.bearer.ToV2())
|
||||
}
|
||||
|
||||
meta.SetSessionToken(options.session.ToV2())
|
||||
|
||||
meta.SetNetworkMagic(options.netMagic)
|
||||
|
||||
return meta
|
||||
}
|
||||
|
||||
func defaultClientOptions() *clientOptions {
|
||||
return &clientOptions{
|
||||
rawOpts: make([]client.Option, 0, 4),
|
||||
|
|
|
@ -26,13 +26,3 @@ func WithResponseInfoHandler(f func(ResponseMetaInfo) error) Option {
|
|||
opts.cbRespInfo = f
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleResponseInfoV2(opts *callOptions, resp responseV2) error {
|
||||
if c.opts.cbRespInfo == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.opts.cbRespInfo(ResponseMetaInfo{
|
||||
key: resp.GetVerificationHeader().GetBodySignature().GetKey(),
|
||||
})
|
||||
}
|
||||
|
|
30
pool/pool.go
30
pool/pool.go
|
@ -484,19 +484,16 @@ func formCacheKey(address string, key *ecdsa.PrivateKey) string {
|
|||
return address + k.String()
|
||||
}
|
||||
|
||||
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client.CallOption, error) {
|
||||
func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, error) {
|
||||
cp, err := p.connection()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientCallOptions := make([]client.CallOption, 0, 3)
|
||||
|
||||
key := p.key
|
||||
if cfg.key != nil {
|
||||
key = cfg.key
|
||||
}
|
||||
clientCallOptions = append(clientCallOptions, client.WithKey(key))
|
||||
|
||||
sessionToken := cfg.stoken
|
||||
if sessionToken == nil && cfg.useDefaultSession {
|
||||
|
@ -505,7 +502,7 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client
|
|||
if sessionToken == nil {
|
||||
cliRes, err := createSessionTokenForDuration(ctx, cp.client, p.stokenDuration)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ownerID := owner.NewIDFromPublicKey(&key.PublicKey)
|
||||
|
@ -516,13 +513,8 @@ func (p *pool) conn(ctx context.Context, cfg *callConfig) (*clientPack, []client
|
|||
_ = p.cache.Put(cacheKey, sessionToken)
|
||||
}
|
||||
}
|
||||
clientCallOptions = append(clientCallOptions, client.WithSession(sessionToken))
|
||||
|
||||
if cfg.btoken != nil {
|
||||
clientCallOptions = append(clientCallOptions, client.WithBearer(cfg.btoken))
|
||||
}
|
||||
|
||||
return cp, clientCallOptions, nil
|
||||
return cp, nil
|
||||
}
|
||||
|
||||
func (p *pool) checkSessionTokenErr(err error, address string) bool {
|
||||
|
@ -1067,7 +1059,7 @@ func (p *pool) SearchObjects(ctx context.Context, idCnr cid.ID, filters object.S
|
|||
|
||||
func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts ...CallOption) (*cid.ID, error) {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1094,7 +1086,7 @@ func (p *pool) PutContainer(ctx context.Context, cnr *container.Container, opts
|
|||
|
||||
func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) (*container.Container, error) {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1121,7 +1113,7 @@ func (p *pool) GetContainer(ctx context.Context, cid *cid.ID, opts ...CallOption
|
|||
|
||||
func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...CallOption) ([]*cid.ID, error) {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1148,7 +1140,7 @@ func (p *pool) ListContainers(ctx context.Context, ownerID *owner.ID, opts ...Ca
|
|||
|
||||
func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOption) error {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1177,7 +1169,7 @@ func (p *pool) DeleteContainer(ctx context.Context, cid *cid.ID, opts ...CallOpt
|
|||
|
||||
func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*eacl.Table, error) {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1204,7 +1196,7 @@ func (p *pool) GetEACL(ctx context.Context, cid *cid.ID, opts ...CallOption) (*e
|
|||
|
||||
func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOption) error {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1233,7 +1225,7 @@ func (p *pool) SetEACL(ctx context.Context, table *eacl.Table, opts ...CallOptio
|
|||
|
||||
func (p *pool) Balance(ctx context.Context, o *owner.ID, opts ...CallOption) (*accounting.Decimal, error) {
|
||||
cfg := cfgFromOpts(opts...)
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -529,7 +529,7 @@ func TestSessionTokenOwner(t *testing.T) {
|
|||
anonOwner := owner.NewIDFromPublicKey(&anonKey.PublicKey)
|
||||
|
||||
cfg := cfgFromOpts(WithKey(anonKey), useDefaultSession())
|
||||
cp, _, err := p.conn(ctx, cfg)
|
||||
cp, err := p.conn(ctx, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
tkn := p.cache.Get(formCacheKey(cp.address, anonKey))
|
||||
|
|
Loading…
Reference in a new issue