forked from TrueCloudLab/neoneo-go
Merge pull request #3037 from nspcc-dev/sdk-upd
oracle: update NeoFS SDK to 1.0.0-rc.9
This commit is contained in:
commit
a2daad6ba6
5 changed files with 89 additions and 100 deletions
6
go.mod
6
go.mod
|
@ -14,7 +14,7 @@ require (
|
|||
github.com/nspcc-dev/dbft v0.0.0-20230515113611-25db6ba61d5c
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230420112658-c50ab951645a
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.8
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.9
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0
|
||||
github.com/pierrec/lz4 v2.6.1+incompatible
|
||||
github.com/pmezard/go-difflib v1.0.0
|
||||
|
@ -49,7 +49,7 @@ require (
|
|||
github.com/nspcc-dev/hrw v1.0.9 // indirect
|
||||
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 // indirect
|
||||
github.com/nspcc-dev/neofs-crypto v0.4.0 // indirect
|
||||
github.com/nspcc-dev/tzhash v1.6.1 // indirect
|
||||
github.com/nspcc-dev/tzhash v1.7.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
|
@ -62,7 +62,7 @@ require (
|
|||
golang.org/x/mod v0.6.0 // indirect
|
||||
golang.org/x/net v0.7.0 // indirect
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
|
||||
golang.org/x/sys v0.5.0 // indirect
|
||||
golang.org/x/sys v0.8.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
|
||||
google.golang.org/grpc v1.48.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
|
|
13
go.sum
13
go.sum
|
@ -223,12 +223,12 @@ github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 h1:jhuN8Ldqz7WApvUJRFY0bjRXE1R3iCkb
|
|||
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0/go.mod h1:DRIr0Ic1s+6QgdqmNFNLIqMqd7lNMJfYwkczlm1hDtM=
|
||||
github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4=
|
||||
github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.8 h1:bsg3o7Oiae2xHYAs1M5yg8GDOs46x/IW5jCh/4dt8uo=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.8/go.mod h1:kq/KoRhj/Ye8b7ctykiXej42Kq09lUg2E5FXGCbLOWs=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.9 h1:uIQlWUUo5n/e8rLFGm14zIValcpXU1HWuwaoXUAHt5Q=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.9/go.mod h1:fTsdTU/M9rvv/f9jlp7vHOm3DRp+NSfjfTv9NohrKTE=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso=
|
||||
github.com/nspcc-dev/tzhash v1.6.1 h1:8dUrWFpjkmoHF+7GxuGUmarj9LLHWFcuyF3CTrqq9JE=
|
||||
github.com/nspcc-dev/tzhash v1.6.1/go.mod h1:BoflzCVp+DO/f1mvbcsJQWoFzidIFBhWFZMglbUW648=
|
||||
github.com/nspcc-dev/tzhash v1.7.0 h1:/+aL33NC7y5OIGnY2kYgjZt8mg7LVGFMdj/KAJLndnk=
|
||||
github.com/nspcc-dev/tzhash v1.7.0/go.mod h1:Dnx9LUlOLr5paL2Rtc96x0PPs8D9eIkUtowt1n+KQus=
|
||||
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
|
@ -468,8 +468,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
|
||||
|
@ -635,7 +635,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package neofs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -18,10 +19,6 @@ import (
|
|||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// ResultReader is a function that reads required amount of data and
|
||||
// checks it.
|
||||
type ResultReader func(io.Reader) ([]byte, error)
|
||||
|
||||
const (
|
||||
// URIScheme is the name of neofs URI scheme.
|
||||
URIScheme = "neofs"
|
||||
|
@ -47,39 +44,60 @@ var (
|
|||
// Get returns a neofs object from the provided url.
|
||||
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
|
||||
// If Command is not provided, full object is requested.
|
||||
func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string, resReader ResultReader) ([]byte, error) {
|
||||
func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) {
|
||||
objectAddr, ps, err := parseNeoFSURL(u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var c = new(client.Client)
|
||||
var prmi client.PrmInit
|
||||
prmi.ResolveNeoFSFailures()
|
||||
var (
|
||||
prmi client.PrmInit
|
||||
c *client.Client
|
||||
)
|
||||
prmi.SetDefaultSigner(neofsecdsa.Signer(priv.PrivateKey))
|
||||
c.Init(prmi)
|
||||
c, err = client.New(prmi)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create client: %w", err)
|
||||
}
|
||||
|
||||
var prmd client.PrmDial
|
||||
var (
|
||||
res = clientCloseWrapper{c: c}
|
||||
prmd client.PrmDial
|
||||
)
|
||||
prmd.SetServerURI(addr)
|
||||
prmd.SetContext(ctx)
|
||||
err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return res, err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
switch {
|
||||
case len(ps) == 0 || ps[0] == "": // Get request
|
||||
return getPayload(ctx, c, objectAddr, resReader)
|
||||
res.ReadCloser, err = getPayload(ctx, c, objectAddr)
|
||||
case ps[0] == rangeCmd:
|
||||
return getRange(ctx, c, objectAddr, resReader, ps[1:]...)
|
||||
res.ReadCloser, err = getRange(ctx, c, objectAddr, ps[1:]...)
|
||||
case ps[0] == headerCmd:
|
||||
return getHeader(ctx, c, objectAddr)
|
||||
res.ReadCloser, err = getHeader(ctx, c, objectAddr)
|
||||
case ps[0] == hashCmd:
|
||||
return getHash(ctx, c, objectAddr, ps[1:]...)
|
||||
res.ReadCloser, err = getHash(ctx, c, objectAddr, ps[1:]...)
|
||||
default:
|
||||
return nil, ErrInvalidCommand
|
||||
err = ErrInvalidCommand
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
type clientCloseWrapper struct {
|
||||
io.ReadCloser
|
||||
c *client.Client
|
||||
}
|
||||
|
||||
func (w clientCloseWrapper) Close() error {
|
||||
var res error
|
||||
if w.ReadCloser != nil {
|
||||
res = w.ReadCloser.Close()
|
||||
}
|
||||
w.c.Close()
|
||||
return res
|
||||
}
|
||||
|
||||
// parseNeoFSURL returns parsed neofs address.
|
||||
|
@ -108,28 +126,11 @@ func parseNeoFSURL(u *url.URL) (*oid.Address, []string, error) {
|
|||
return objAddr, ps[2:], nil
|
||||
}
|
||||
|
||||
func getPayload(ctx context.Context, c *client.Client, addr *oid.Address, resReader ResultReader) ([]byte, error) {
|
||||
var getPrm client.PrmObjectGet
|
||||
getPrm.FromContainer(addr.Container())
|
||||
getPrm.ByID(addr.Object())
|
||||
|
||||
objR, err := c.ObjectGetInit(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := resReader(objR)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = objR.Close() // Using ResolveNeoFSFailures.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
func getPayload(ctx context.Context, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
return c.ObjectGetInit(ctx, addr.Container(), addr.Object(), client.PrmObjectGet{})
|
||||
}
|
||||
|
||||
func getRange(ctx context.Context, c *client.Client, addr *oid.Address, resReader ResultReader, ps ...string) ([]byte, error) {
|
||||
func getRange(ctx context.Context, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
if len(ps) == 0 {
|
||||
return nil, ErrInvalidRange
|
||||
}
|
||||
|
@ -137,34 +138,12 @@ func getRange(ctx context.Context, c *client.Client, addr *oid.Address, resReade
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var rangePrm client.PrmObjectRange
|
||||
rangePrm.FromContainer(addr.Container())
|
||||
rangePrm.ByID(addr.Object())
|
||||
rangePrm.SetLength(r.GetLength())
|
||||
rangePrm.SetOffset(r.GetOffset())
|
||||
|
||||
rangeR, err := c.ObjectRangeInit(ctx, rangePrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := resReader(rangeR)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = rangeR.Close() // Using ResolveNeoFSFailures.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
return c.ObjectRangeInit(ctx, addr.Container(), addr.Object(), r.GetOffset(), r.GetLength(), client.PrmObjectRange{})
|
||||
}
|
||||
|
||||
func getObjHeader(ctx context.Context, c *client.Client, addr *oid.Address) (*object.Object, error) {
|
||||
var headPrm client.PrmObjectHead
|
||||
headPrm.FromContainer(addr.Container())
|
||||
headPrm.ByID(addr.Object())
|
||||
|
||||
res, err := c.ObjectHead(ctx, headPrm)
|
||||
res, err := c.ObjectHead(ctx, addr.Container(), addr.Object(), client.PrmObjectHead{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -175,15 +154,19 @@ func getObjHeader(ctx context.Context, c *client.Client, addr *oid.Address) (*ob
|
|||
return obj, nil
|
||||
}
|
||||
|
||||
func getHeader(ctx context.Context, c *client.Client, addr *oid.Address) ([]byte, error) {
|
||||
func getHeader(ctx context.Context, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
|
||||
obj, err := getObjHeader(ctx, c, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj.MarshalHeaderJSON()
|
||||
res, err := obj.MarshalHeaderJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return io.NopCloser(bytes.NewReader(res)), nil
|
||||
}
|
||||
|
||||
func getHash(ctx context.Context, c *client.Client, addr *oid.Address, ps ...string) ([]byte, error) {
|
||||
func getHash(ctx context.Context, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
|
||||
if len(ps) == 0 || ps[0] == "" { // hash of the full payload
|
||||
obj, err := getObjHeader(ctx, c, addr)
|
||||
if err != nil {
|
||||
|
@ -193,22 +176,19 @@ func getHash(ctx context.Context, c *client.Client, addr *oid.Address, ps ...str
|
|||
if !flag {
|
||||
return nil, errors.New("missing checksum in the reply")
|
||||
}
|
||||
return sum.Value(), nil
|
||||
return io.NopCloser(bytes.NewReader(sum.Value())), nil
|
||||
}
|
||||
r, err := parseRange(ps[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var hashPrm client.PrmObjectHash
|
||||
hashPrm.FromContainer(addr.Container())
|
||||
hashPrm.ByID(addr.Object())
|
||||
hashPrm.SetRangeList(r.GetOffset(), r.GetLength())
|
||||
|
||||
res, err := c.ObjectHash(ctx, hashPrm)
|
||||
hashes, err := c.ObjectHash(ctx, addr.Container(), addr.Object(), hashPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hashes := res.Checksums() // Using ResolveNeoFSFailures.
|
||||
if len(hashes) == 0 {
|
||||
return nil, fmt.Errorf("%w: empty response", ErrInvalidRange)
|
||||
}
|
||||
|
@ -216,7 +196,11 @@ func getHash(ctx context.Context, c *client.Client, addr *oid.Address, ps ...str
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("decode Uint256: %w", err)
|
||||
}
|
||||
return u256.MarshalJSON()
|
||||
res, err := u256.MarshalJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return io.NopCloser(bytes.NewReader(res)), nil
|
||||
}
|
||||
|
||||
func parseRange(s string) (*object.Range, error) {
|
||||
|
|
|
@ -146,16 +146,7 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
|
|||
break
|
||||
}
|
||||
|
||||
resp.Result, err = readResponse(r.Body)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrResponseTooLarge) {
|
||||
resp.Code = transaction.ResponseTooLarge
|
||||
} else {
|
||||
resp.Code = transaction.Error
|
||||
}
|
||||
o.Log.Warn("failed to read data for oracle request", zap.String("url", req.Req.URL), zap.Error(err))
|
||||
break
|
||||
}
|
||||
resp.Result, resp.Code = o.readResponse(r.Body, req.Req.URL)
|
||||
case http.StatusForbidden:
|
||||
resp.Code = transaction.Forbidden
|
||||
case http.StatusNotFound:
|
||||
|
@ -169,15 +160,17 @@ func (o *Oracle) processRequest(priv *keys.PrivateKey, req request) error {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), o.MainCfg.NeoFS.Timeout)
|
||||
defer cancel()
|
||||
index := (int(req.ID) + incTx.attempts) % len(o.MainCfg.NeoFS.Nodes)
|
||||
resp.Result, err = neofs.Get(ctx, priv, u, o.MainCfg.NeoFS.Nodes[index], readResponse)
|
||||
rc, err := neofs.Get(ctx, priv, u, o.MainCfg.NeoFS.Nodes[index])
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrResponseTooLarge) {
|
||||
resp.Code = transaction.ResponseTooLarge
|
||||
} else {
|
||||
resp.Code = transaction.Error
|
||||
resp.Code = transaction.Error
|
||||
o.Log.Warn("failed to perform oracle request", zap.String("url", req.Req.URL), zap.Error(err))
|
||||
if rc != nil {
|
||||
rc.Close() // intentionally skip the closing error, make it unified with Oracle `https` protocol.
|
||||
}
|
||||
o.Log.Warn("oracle request failed", zap.String("url", req.Req.URL), zap.Error(err))
|
||||
break
|
||||
}
|
||||
resp.Result, resp.Code = o.readResponse(rc, req.Req.URL)
|
||||
rc.Close() // intentionally skip the closing error, make it unified with Oracle `https` protocol.
|
||||
default:
|
||||
resp.Code = transaction.ProtocolNotSupported
|
||||
o.Log.Warn("unknown oracle request scheme", zap.String("url", req.Req.URL))
|
||||
|
|
|
@ -68,17 +68,30 @@ func (o *Oracle) AddResponse(pub *keys.PublicKey, reqID uint64, txSig []byte) {
|
|||
// ErrResponseTooLarge is returned when a response exceeds the max allowed size.
|
||||
var ErrResponseTooLarge = errors.New("too big response")
|
||||
|
||||
func readResponse(rc gio.Reader) ([]byte, error) {
|
||||
func (o *Oracle) readResponse(rc gio.Reader, url string) ([]byte, transaction.OracleResponseCode) {
|
||||
const limit = transaction.MaxOracleResultSize
|
||||
buf := make([]byte, limit+1)
|
||||
n, err := gio.ReadFull(rc, buf)
|
||||
if errors.Is(err, gio.ErrUnexpectedEOF) && n <= limit {
|
||||
return checkUTF8(buf[:n])
|
||||
res, err := checkUTF8(buf[:n])
|
||||
return o.handleResponseError(res, err, url)
|
||||
}
|
||||
if err == nil || n > limit {
|
||||
return nil, ErrResponseTooLarge
|
||||
return o.handleResponseError(nil, ErrResponseTooLarge, url)
|
||||
}
|
||||
return nil, err
|
||||
|
||||
return o.handleResponseError(nil, err, url)
|
||||
}
|
||||
|
||||
func (o *Oracle) handleResponseError(data []byte, err error, url string) ([]byte, transaction.OracleResponseCode) {
|
||||
if err != nil {
|
||||
o.Log.Warn("failed to read data for oracle request", zap.String("url", url), zap.Error(err))
|
||||
if errors.Is(err, ErrResponseTooLarge) {
|
||||
return nil, transaction.ResponseTooLarge
|
||||
}
|
||||
return nil, transaction.Error
|
||||
}
|
||||
return data, transaction.Success
|
||||
}
|
||||
|
||||
func checkUTF8(v []byte) ([]byte, error) {
|
||||
|
|
Loading…
Reference in a new issue