From 5dff3fc13da481348c8930098d2615facdebb53a Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 9 Sep 2024 12:46:34 +0400 Subject: [PATCH] neofs: add GetSDKClient, ObjectSearch and GetWithClient Could be used for operation with neofs. Signed-off-by: Ekaterina Pavlova --- go.mod | 2 +- pkg/services/oracle/neofs/neofs.go | 118 +++++++++++++++++++++++------ 2 files changed, 95 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index cc0b94faa..6c4f23cde 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( golang.org/x/term v0.23.0 golang.org/x/text v0.17.0 golang.org/x/tools v0.24.0 + google.golang.org/grpc v1.62.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -72,7 +73,6 @@ require ( golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect google.golang.org/protobuf v1.34.2 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b..af245463e 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" @@ -17,6 +18,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -45,41 +48,48 @@ var ( // URI scheme is "neofs://". // If Command is not provided, full object is requested. func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { + c, err := GetSDKClient(ctx, addr, 0) + if err != nil { + return clientCloseWrapper{c: c}, fmt.Errorf("failed to create client: %w", err) + } + return GetWithClient(ctx, c, priv, u, true) +} + +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. If wrapClientCloser is true, +// the client will be closed when the returned ReadCloser is closed. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) { objectAddr, ps, err := parseNeoFSURL(u) if err != nil { return nil, err } - - c, err := client.New(client.PrmInit{}) - if err != nil { - return nil, fmt.Errorf("failed to create client: %w", err) - } - var ( - res = clientCloseWrapper{c: c} - prmd client.PrmDial + res io.ReadCloser + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) ) - prmd.SetServerURI(addr) - prmd.SetContext(ctx) - err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter - if err != nil { - return res, err - } - - var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) switch { - case len(ps) == 0 || ps[0] == "": // Get request - res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) + case len(ps) == 0 || ps[0] == "": + res, err = getPayload(ctx, s, c, objectAddr) case ps[0] == rangeCmd: - res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...) + res, err = getRange(ctx, s, c, objectAddr, ps[1:]...) case ps[0] == headerCmd: - res.ReadCloser, err = getHeader(ctx, s, c, objectAddr) + res, err = getHeader(ctx, s, c, objectAddr) case ps[0] == hashCmd: - res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...) + res, err = getHash(ctx, s, c, objectAddr, ps[1:]...) default: - err = ErrInvalidCommand + return nil, ErrInvalidCommand } - return res, err + if err != nil { + return nil, err + } + if wrapClientCloser { + return clientCloseWrapper{ + c: c, + ReadCloser: res, + }, nil + } + return res, nil } type clientCloseWrapper struct { @@ -92,7 +102,12 @@ func (w clientCloseWrapper) Close() error { if w.ReadCloser != nil { res = w.ReadCloser.Close() } - w.c.Close() + if w.c != nil { + closeErr := w.c.Close() + if closeErr != nil && res == nil { + res = closeErr + } + } return res } @@ -220,3 +235,58 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + containerID cid.ID + ) + err := containerID.DecodeString(containerIDStr) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) + } + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object IDs iteration: %w", err) + } + return objectIDs, nil +} + +// GetSDKClient returns a NeoFS SDK client configured with the specified address and context. +// If timeout is 0, the default timeout will be used. +func GetSDKClient(ctx context.Context, addr string, timeout time.Duration) (*client.Client, error) { + var prmDial client.PrmDial + if addr == "" { + return nil, errors.New("address is empty") + } + prmDial.SetServerURI(addr) + prmDial.SetContext(ctx) + if timeout != 0 { + prmDial.SetTimeout(timeout) + prmDial.SetStreamTimeout(timeout) + } + c, err := client.New(client.PrmInit{}) + if err != nil { + return nil, fmt.Errorf("can't create SDK client: %w", err) + } + + if err := c.Dial(prmDial); err != nil { + if status.Code(err) == codes.Unimplemented { + return c, nil + } + return nil, fmt.Errorf("can't init SDK client: %w", err) + } + + return c, nil +}