From 04516e7d267957f0c0f2f2c085f72d1cc210ca13 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 25 Nov 2024 23:04:28 +0300 Subject: [PATCH] neofs: add pool support for NeoFS operations Signed-off-by: Ekaterina Pavlova --- cli/util/upload_bin.go | 30 +++++++++++++++++++-------- pkg/services/oracle/neofs/neofs.go | 33 +++++++++++++++++------------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index b41b5592f..c2de09afe 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -61,6 +61,17 @@ const ( defaultHealthcheckTimeout = 10 * time.Second ) +// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. +type poolWrapper struct { + *pool.Pool +} + +// Close closes the pool and returns nil. +func (p poolWrapper) Close() error { + p.Pool.Close() + return nil +} + func uploadBin(ctx *cli.Context) error { if err := cmdargs.EnsureNone(ctx); err != nil { return err @@ -99,8 +110,8 @@ func uploadBin(ctx *cli.Context) error { if err != nil { return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1) } - - if err = p.Dial(context.Background()); err != nil { + pWrapper := poolWrapper{p} + if err = pWrapper.Dial(context.Background()); err != nil { return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1) } defer p.Close() @@ -148,14 +159,14 @@ func uploadBin(ctx *cli.Context) error { fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex) if !ctx.Bool("skip-blocks-uploading") { - err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug) + err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug) if err != nil { return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1) } oldestMissingBlockIndex = int(currentBlockHeight) + 1 } - err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug) + err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug) if err != nil { return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1) } @@ -194,6 +205,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID wg sync.WaitGroup numBatches = currentHeight / searchBatchSize emptyBatchFound bool + pWrapper = poolWrapper{p} ) for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches { @@ -227,7 +239,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID err error ) err = retry(func() error { - objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm) + objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm) return err }, maxRetries) results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err} @@ -252,7 +264,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID } // uploadBlocks uploads the blocks to the container using the pool. -func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error { +func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error { if oldestMissingBlockIndex > int(currentBlockHeight) { fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight) return nil @@ -343,7 +355,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer } // uploadIndexFiles uploads missing index files to the container. -func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error { +func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error { var ( attributeKey = ctx.String("index-attribute") indexFileSize = ctx.Uint("index-file-size") @@ -505,7 +517,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun // searchObjects searches in parallel for objects with attribute GE startIndex and LT // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. -func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { +func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { var res = make(chan oid.ID, 2*searchBatchSize) go func() { var wg sync.WaitGroup @@ -567,7 +579,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun } // uploadObj uploads object to the container using provided settings. -func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) { +func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) { var ( ownerID user.ID hdr object.Object diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 7f2ddd564..6798d78fd 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -44,6 +44,16 @@ var ( ErrInvalidCommand = errors.New("invalid command") ) +// Client is a NeoFS client interface. +type Client interface { + ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error) + ObjectGetInit(ctx context.Context, container cid.ID, id oid.ID, s user.Signer, get client.PrmObjectGet) (object.Object, *client.PayloadReader, error) + ObjectRangeInit(ctx context.Context, container cid.ID, id oid.ID, offset uint64, length uint64, s user.Signer, objectRange client.PrmObjectRange) (*client.ObjectRangeReader, error) + ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error) + ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error) + Close() error +} + // Get returns a neofs object from the provided url. // URI scheme is "neofs://". // If Command is not provided, full object is requested. @@ -59,7 +69,7 @@ func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (i // URI scheme is "neofs://". // If Command is not provided, full object is requested. If wrapClientCloser is true, // the client will be closed when the returned ReadCloser is closed. -func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) { +func GetWithClient(ctx context.Context, c Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) { objectAddr, ps, err := parseNeoFSURL(u) if err != nil { return nil, err @@ -94,7 +104,7 @@ func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, type clientCloseWrapper struct { io.ReadCloser - c *client.Client + c Client } func (w clientCloseWrapper) Close() error { @@ -137,7 +147,7 @@ func parseNeoFSURL(u *url.URL) (*oid.Address, []string, error) { return objAddr, ps[2:], nil } -func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) { +func getPayload(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) { var iorc io.ReadCloser _, rc, err := c.ObjectGetInit(ctx, addr.Container(), addr.Object(), s, client.PrmObjectGet{}) if rc != nil { @@ -146,7 +156,7 @@ func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid. return iorc, err } -func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) { +func getRange(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) { var iorc io.ReadCloser if len(ps) == 0 { return nil, ErrInvalidRange @@ -163,11 +173,11 @@ func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Ad return iorc, err } -func getObjHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (*object.Object, error) { +func getObjHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (*object.Object, error) { return c.ObjectHead(ctx, addr.Container(), addr.Object(), s, client.PrmObjectHead{}) } -func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) { +func getHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) { obj, err := getObjHeader(ctx, s, c, addr) if err != nil { return nil, err @@ -179,7 +189,7 @@ func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.A return io.NopCloser(bytes.NewReader(res)), nil } -func getHash(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) { +func getHash(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) { if len(ps) == 0 || ps[0] == "" { // hash of the full payload obj, err := getObjHeader(ctx, s, c, addr) if err != nil { @@ -236,13 +246,8 @@ func parseRange(s string) (*object.Range, error) { return r, nil } -// ObjectSearchInitter defines the interface for initializing object search. -type ObjectSearchInitter interface { - ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error) -} - // ObjectSearch returns a list of object IDs from the provided container. -func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { +func ObjectSearch(ctx context.Context, c Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) { var ( s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) objectIDs []oid.ID @@ -252,7 +257,7 @@ func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.P if err != nil { return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err) } - reader, err := initter.ObjectSearchInit(ctx, containerID, s, prm) + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) if err != nil { return nil, fmt.Errorf("failed to initiate object search: %w", err) }