neofs: add pool support for NeoFS operations

Signed-off-by: Ekaterina Pavlova <ekt@morphbits.io>
This commit is contained in:
Ekaterina Pavlova 2024-11-25 23:04:28 +03:00
parent e89f9fe2a4
commit 04516e7d26
2 changed files with 40 additions and 23 deletions

View file

@ -61,6 +61,17 @@ const (
defaultHealthcheckTimeout = 10 * time.Second
)
// poolWrapper wraps a NeoFS pool to adapt its Close method to return an error.
type poolWrapper struct {
*pool.Pool
}
// Close closes the pool and returns nil.
func (p poolWrapper) Close() error {
p.Pool.Close()
return nil
}
func uploadBin(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
@ -99,8 +110,8 @@ func uploadBin(ctx *cli.Context) error {
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
}
if err = p.Dial(context.Background()); err != nil {
pWrapper := poolWrapper{p}
if err = pWrapper.Dial(context.Background()); err != nil {
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
}
defer p.Close()
@ -148,14 +159,14 @@ func uploadBin(ctx *cli.Context) error {
fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex)
if !ctx.Bool("skip-blocks-uploading") {
err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
err = uploadBlocks(ctx, pWrapper, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1)
}
oldestMissingBlockIndex = int(currentBlockHeight) + 1
}
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
err = uploadIndexFiles(ctx, pWrapper, containerID, acc, signer, uint(oldestMissingBlockIndex), attr, homomorphicHashingDisabled, maxParallelSearches, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
}
@ -194,6 +205,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
wg sync.WaitGroup
numBatches = currentHeight / searchBatchSize
emptyBatchFound bool
pWrapper = poolWrapper{p}
)
for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
@ -227,7 +239,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
err error
)
err = retry(func() error {
objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm)
objectIDs, err = neofs.ObjectSearch(ctx, pWrapper, priv, containerID.String(), prm)
return err
}, maxRetries)
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
@ -252,7 +264,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
}
// uploadBlocks uploads the blocks to the container using the pool.
func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
func uploadBlocks(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers, maxRetries int, debug bool) error {
if oldestMissingBlockIndex > int(currentBlockHeight) {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight)
return nil
@ -343,7 +355,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
}
// uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
func uploadIndexFiles(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, oldestMissingBlockIndex uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches, maxRetries int, debug bool) error {
var (
attributeKey = ctx.String("index-attribute")
indexFileSize = ctx.Uint("index-file-size")
@ -505,7 +517,7 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches, maxRetries int, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
go func() {
var wg sync.WaitGroup
@ -567,7 +579,7 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
}
// uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) (oid.ID, error) {
var (
ownerID user.ID
hdr object.Object

View file

@ -44,6 +44,16 @@ var (
ErrInvalidCommand = errors.New("invalid command")
)
// Client is a NeoFS client interface.
type Client interface {
ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
ObjectGetInit(ctx context.Context, container cid.ID, id oid.ID, s user.Signer, get client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectRangeInit(ctx context.Context, container cid.ID, id oid.ID, offset uint64, length uint64, s user.Signer, objectRange client.PrmObjectRange) (*client.ObjectRangeReader, error)
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
Close() error
}
// Get returns a neofs object from the provided url.
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
// If Command is not provided, full object is requested.
@ -59,7 +69,7 @@ func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (i
// URI scheme is "neofs:<Container-ID>/<Object-ID/<Command>/<Params>".
// If Command is not provided, full object is requested. If wrapClientCloser is true,
// the client will be closed when the returned ReadCloser is closed.
func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) {
func GetWithClient(ctx context.Context, c Client, priv *keys.PrivateKey, u *url.URL, wrapClientCloser bool) (io.ReadCloser, error) {
objectAddr, ps, err := parseNeoFSURL(u)
if err != nil {
return nil, err
@ -94,7 +104,7 @@ func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey,
type clientCloseWrapper struct {
io.ReadCloser
c *client.Client
c Client
}
func (w clientCloseWrapper) Close() error {
@ -137,7 +147,7 @@ func parseNeoFSURL(u *url.URL) (*oid.Address, []string, error) {
return objAddr, ps[2:], nil
}
func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
func getPayload(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) {
var iorc io.ReadCloser
_, rc, err := c.ObjectGetInit(ctx, addr.Container(), addr.Object(), s, client.PrmObjectGet{})
if rc != nil {
@ -146,7 +156,7 @@ func getPayload(ctx context.Context, s user.Signer, c *client.Client, addr *oid.
return iorc, err
}
func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
func getRange(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
var iorc io.ReadCloser
if len(ps) == 0 {
return nil, ErrInvalidRange
@ -163,11 +173,11 @@ func getRange(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Ad
return iorc, err
}
func getObjHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (*object.Object, error) {
func getObjHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (*object.Object, error) {
return c.ObjectHead(ctx, addr.Container(), addr.Object(), s, client.PrmObjectHead{})
}
func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address) (io.ReadCloser, error) {
func getHeader(ctx context.Context, s user.Signer, c Client, addr *oid.Address) (io.ReadCloser, error) {
obj, err := getObjHeader(ctx, s, c, addr)
if err != nil {
return nil, err
@ -179,7 +189,7 @@ func getHeader(ctx context.Context, s user.Signer, c *client.Client, addr *oid.A
return io.NopCloser(bytes.NewReader(res)), nil
}
func getHash(ctx context.Context, s user.Signer, c *client.Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
func getHash(ctx context.Context, s user.Signer, c Client, addr *oid.Address, ps ...string) (io.ReadCloser, error) {
if len(ps) == 0 || ps[0] == "" { // hash of the full payload
obj, err := getObjHeader(ctx, s, c, addr)
if err != nil {
@ -236,13 +246,8 @@ func parseRange(s string) (*object.Range, error) {
return r, nil
}
// ObjectSearchInitter defines the interface for initializing object search.
type ObjectSearchInitter interface {
ObjectSearchInit(ctx context.Context, containerID cid.ID, s user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
}
// ObjectSearch returns a list of object IDs from the provided container.
func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
func ObjectSearch(ctx context.Context, c Client, priv *keys.PrivateKey, containerIDStr string, prm client.PrmObjectSearch) ([]oid.ID, error) {
var (
s = user.NewAutoIDSignerRFC6979(priv.PrivateKey)
objectIDs []oid.ID
@ -252,7 +257,7 @@ func ObjectSearch(ctx context.Context, initter ObjectSearchInitter, priv *keys.P
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrInvalidContainer, err)
}
reader, err := initter.ObjectSearchInit(ctx, containerID, s, prm)
reader, err := c.ObjectSearchInit(ctx, containerID, s, prm)
if err != nil {
return nil, fmt.Errorf("failed to initiate object search: %w", err)
}