diff --git a/cli/util/convert.go b/cli/util/convert.go index 886c56d6e..0117dd853 100644 --- a/cli/util/convert.go +++ b/cli/util/convert.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/cli/txctx" vmcli "github.com/nspcc-dev/neo-go/cli/vm" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/urfave/cli/v2" ) @@ -74,7 +75,7 @@ func NewCommands() []*cli.Command { &cli.UintFlag{ Name: "index-file-size", Usage: "Size of index file", - Value: 128000, + Value: neofs.DefaultIndexFileSize, }, &cli.UintFlag{ Name: "workers", @@ -89,7 +90,7 @@ func NewCommands() []*cli.Command { &cli.UintFlag{ Name: "retries", Usage: "Maximum number of Neo/NeoFS node request retries", - Value: 5, + Value: neofs.MaxRetries, Action: func(context *cli.Context, u uint) error { if u < 1 { return cli.Exit("retries should be greater than 0", 1) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 8e23226eb..990ce52de 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -2,7 +2,6 @@ package util import ( "context" - "crypto/sha256" "fmt" "slices" "strconv" @@ -14,7 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/rpcclient" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-sdk-go/checksum" @@ -30,35 +29,6 @@ import ( "github.com/urfave/cli/v2" ) -const ( - // Number of objects to search in a batch. We need to search with EQ filter to - // avoid partially-completed SEARCH responses. If EQ search haven't found object - // the object will be uploaded one more time which may lead to duplicating objects. - // We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees - // search results). - searchBatchSize = 1 - // Size of object ID. - oidSize = sha256.Size -) - -// Constants related to retry mechanism. -const ( - // Initial backoff duration. - initialBackoff = 500 * time.Millisecond - // Backoff multiplier. - backoffFactor = 2 - // Maximum backoff duration. - maxBackoff = 20 * time.Second -) - -// Constants related to NeoFS pool request timeouts. -// Such big values are used to avoid NeoFS pool timeouts during block search and upload. -const ( - defaultDialTimeout = 10 * time.Minute - defaultStreamTimeout = 10 * time.Minute - defaultHealthcheckTimeout = 10 * time.Second -) - // poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. type poolWrapper struct { *pool.Pool @@ -103,9 +73,9 @@ func uploadBin(ctx *cli.Context) error { signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey) params := pool.DefaultOptions() - params.SetHealthcheckTimeout(defaultHealthcheckTimeout) - params.SetNodeDialTimeout(defaultDialTimeout) - params.SetNodeStreamTimeout(defaultStreamTimeout) + params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) + params.SetNodeDialTimeout(neofs.DefaultDialTimeout) + params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params) if err != nil { return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1) @@ -166,15 +136,15 @@ func uploadBin(ctx *cli.Context) error { // retry function with exponential backoff. func retry(action func() error, maxRetries uint) error { var err error - backoff := initialBackoff + backoff := neofs.InitialBackoff for range maxRetries { if err = action(); err == nil { return nil // Success, no retry needed. } time.Sleep(backoff) // Backoff before retrying. - backoff *= time.Duration(backoffFactor) - if backoff > maxBackoff { - backoff = maxBackoff + backoff *= time.Duration(neofs.BackoffFactor) + if backoff > neofs.MaxBackoff { + backoff = neofs.MaxBackoff } } return err // Return the last error after exhausting retries. @@ -193,7 +163,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C errCh = make(chan error) doneCh = make(chan struct{}) wg sync.WaitGroup - emptyOID = make([]byte, oidSize) + emptyOID = make([]byte, neofs.OIDSize) ) fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1) wg.Add(int(numWorkers)) @@ -201,7 +171,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C go func(i uint) { defer wg.Done() for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers { - if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 { + if slices.Compare(buf[blockIndex%indexFileSize*neofs.OIDSize:blockIndex%indexFileSize*neofs.OIDSize+neofs.OIDSize], emptyOID) != 0 { if debug { fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex) } @@ -263,7 +233,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C } return } - resOid.Encode(buf[blockIndex%indexFileSize*oidSize:]) + resOid.Encode(buf[blockIndex%indexFileSize*neofs.OIDSize:]) } }(i) } @@ -281,9 +251,9 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1) // Additional check for empty OIDs in the buffer. - for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize { - if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 { - return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize) + for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OIDSize; k += neofs.OIDSize { + if slices.Compare(buf[k:k+neofs.OIDSize], emptyOID) == 0 { + return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OIDSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OIDSize) } } if indexFileEnd-indexFileStart == indexFileSize { @@ -310,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) { var ( // buf is used to store OIDs of the uploaded blocks. - buf = make([]byte, indexFileSize*oidSize) + buf = make([]byte, indexFileSize*neofs.OIDSize) doneCh = make(chan struct{}) errCh = make(chan error) @@ -377,7 +347,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun } pos := uint(blockIndex) % indexFileSize if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { - id.Encode(buf[pos*oidSize:]) + id.Encode(buf[pos*neofs.OIDSize:]) } } }() @@ -404,15 +374,15 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { - var res = make(chan oid.ID, 2*searchBatchSize) + var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) go func() { var wg sync.WaitGroup defer close(res) - for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches { + for i := startIndex; i < endIndex; i += neofs.DefaultSearchBatchSize * maxParallelSearches { for j := range maxParallelSearches { - start := i + j*searchBatchSize - end := start + searchBatchSize + start := i + j*neofs.DefaultSearchBatchSize + end := start + neofs.DefaultSearchBatchSize if start >= endIndex { break diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index d6f73426d..41aa7b698 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -2,7 +2,6 @@ package blockfetcher import ( "context" - "crypto/sha256" "errors" "fmt" "io" @@ -16,7 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" gio "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -29,46 +28,8 @@ import ( ) const ( - // oidSize is the size of the object ID in NeoFS. - oidSize = sha256.Size - // defaultTimeout is the default timeout for NeoFS requests. - defaultTimeout = 10 * time.Minute // DefaultQueueCacheSize is the default size of the queue cache. DefaultQueueCacheSize = 16000 - // defaultDownloaderWorkersCount is the default number of workers downloading blocks. - defaultDownloaderWorkersCount = 500 - // defaultIndexFileSize is the default size of the index file. - defaultIndexFileSize = 128000 - // DefaultBlockAttribute is the default attribute name for block objects. - defaultBlockAttribute = "Block" - // defaultIndexFileAttribute is the default attribute name for index file objects. - defaultIndexFileAttribute = "Index" -) - -// Constants related to NeoFS pool request timeouts. -const ( - // defaultDialTimeout is a default timeout used to establish connection with - // NeoFS storage nodes. - defaultDialTimeout = 30 * time.Second - // defaultStreamTimeout is a default timeout used for NeoFS streams processing. - // It has significantly large value to reliably avoid timeout problems with heavy - // SEARCH requests. - defaultStreamTimeout = 10 * time.Minute - // defaultHealthcheckTimeout is a timeout for request to NeoFS storage node to - // decide if it is alive. - defaultHealthcheckTimeout = 10 * time.Second -) - -// Constants related to retry mechanism. -const ( - // maxRetries is the maximum number of retries for a single operation. - maxRetries = 5 - // initialBackoff is the initial backoff duration. - initialBackoff = 500 * time.Millisecond - // backoffFactor is the factor by which the backoff duration is multiplied. - backoffFactor = 2 - // maxBackoff is the maximum backoff duration. - maxBackoff = 20 * time.Second ) // Ledger is an interface to Blockchain sufficient for Service. @@ -149,28 +110,28 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc } } if cfg.Timeout <= 0 { - cfg.Timeout = defaultTimeout + cfg.Timeout = neofs.DefaultTimeout } if cfg.OIDBatchSize <= 0 { cfg.OIDBatchSize = cfg.BQueueSize / 2 } if cfg.DownloaderWorkersCount <= 0 { - cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount + cfg.DownloaderWorkersCount = neofs.DefaultDownloaderWorkersCount } if cfg.IndexFileSize <= 0 { - cfg.IndexFileSize = defaultIndexFileSize + cfg.IndexFileSize = neofs.DefaultIndexFileSize } if cfg.BlockAttribute == "" { - cfg.BlockAttribute = defaultBlockAttribute + cfg.BlockAttribute = neofs.DefaultBlockAttribute } if cfg.IndexFileAttribute == "" { - cfg.IndexFileAttribute = defaultIndexFileAttribute + cfg.IndexFileAttribute = neofs.DefaultIndexFileAttribute } params := pool.DefaultOptions() - params.SetHealthcheckTimeout(defaultHealthcheckTimeout) - params.SetNodeDialTimeout(defaultDialTimeout) - params.SetNodeStreamTimeout(defaultStreamTimeout) + params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) + params.SetNodeDialTimeout(neofs.DefaultDialTimeout) + params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params) if err != nil { return nil, err @@ -369,7 +330,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { // streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { defer rc.Close() - oidBytes := make([]byte, oidSize) + oidBytes := make([]byte, neofs.OIDSize) oidsProcessed := 0 for { @@ -409,7 +370,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { func (bfs *Service) fetchOIDsBySearch() error { startIndex := bfs.chain.BlockHeight() //We need to search with EQ filter to avoid partially-completed SEARCH responses. - batchSize := uint32(1) + batchSize := uint32(neofs.DefaultSearchBatchSize) for { select { @@ -525,7 +486,7 @@ func (bfs *Service) IsActive() bool { func (bfs *Service) retry(action func() error) error { var ( err error - backoff = initialBackoff + backoff = neofs.InitialBackoff timer = time.NewTimer(0) ) defer func() { @@ -537,11 +498,11 @@ func (bfs *Service) retry(action func() error) error { } }() - for i := range maxRetries { + for i := range neofs.MaxRetries { if err = action(); err == nil { return nil } - if i == maxRetries-1 { + if i == neofs.MaxRetries-1 { break } timer.Reset(backoff) @@ -551,16 +512,16 @@ func (bfs *Service) retry(action func() error) error { case <-bfs.ctx.Done(): return bfs.ctx.Err() } - backoff *= time.Duration(backoffFactor) - if backoff > maxBackoff { - backoff = maxBackoff + backoff *= time.Duration(neofs.BackoffFactor) + if backoff > neofs.MaxBackoff { + backoff = neofs.MaxBackoff } } return err } func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { - u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid)) if err != nil { return nil, err } diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 1ecd6744d..3c5c5a687 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -73,9 +74,9 @@ func TestServiceConstructor(t *testing.T) { require.NotNil(t, service) require.Equal(t, service.IsActive(), false) - require.Equal(t, service.cfg.Timeout, defaultTimeout) + require.Equal(t, service.cfg.Timeout, neofs.DefaultTimeout) require.Equal(t, service.cfg.OIDBatchSize, DefaultQueueCacheSize/2) - require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) + require.Equal(t, service.cfg.DownloaderWorkersCount, neofs.DefaultDownloaderWorkersCount) require.Equal(t, service.IsActive(), false) }) diff --git a/pkg/services/helpers/neofs/blockstorage.go b/pkg/services/helpers/neofs/blockstorage.go new file mode 100644 index 000000000..be3214c5b --- /dev/null +++ b/pkg/services/helpers/neofs/blockstorage.go @@ -0,0 +1,55 @@ +package neofs + +import ( + "crypto/sha256" + "time" +) + +// Constants related to NeoFS block storage. +const ( + // OIDSize is the size of the object ID in NeoFS. + OIDSize = sha256.Size + // DefaultTimeout is the default timeout for NeoFS requests. + DefaultTimeout = 10 * time.Minute + // DefaultDownloaderWorkersCount is the default number of workers downloading blocks. + DefaultDownloaderWorkersCount = 500 + // DefaultIndexFileSize is the default size of the index file. + DefaultIndexFileSize = 128000 + // DefaultBlockAttribute is the default attribute name for block objects. + DefaultBlockAttribute = "Block" + // DefaultIndexFileAttribute is the default attribute name for index file objects. + DefaultIndexFileAttribute = "Index" + + // DefaultSearchBatchSize is a number of objects to search in a batch. We need to + // search with EQ filter to avoid partially-completed SEARCH responses. If EQ search + // hasn't found object the object will be uploaded one more time which may lead to + // duplicating objects. We will have a risk of duplicates until #3645 is resolved + // (NeoFS guarantees search results). + DefaultSearchBatchSize = 1 +) + +// Constants related to NeoFS pool request timeouts. +const ( + // DefaultDialTimeout is a default timeout used to establish connection with + // NeoFS storage nodes. + DefaultDialTimeout = 30 * time.Second + // DefaultStreamTimeout is a default timeout used for NeoFS streams processing. + // It has significantly large value to reliably avoid timeout problems with heavy + // SEARCH requests. + DefaultStreamTimeout = 10 * time.Minute + // DefaultHealthcheckTimeout is a timeout for request to NeoFS storage node to + // decide if it is alive. + DefaultHealthcheckTimeout = 10 * time.Second +) + +// Constants related to retry mechanism. +const ( + // MaxRetries is the maximum number of retries for a single operation. + MaxRetries = 5 + // InitialBackoff is the initial backoff duration. + InitialBackoff = 500 * time.Millisecond + // BackoffFactor is the factor by which the backoff duration is multiplied. + BackoffFactor = 2 + // MaxBackoff is the maximum backoff duration. + MaxBackoff = 20 * time.Second +) diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/helpers/neofs/neofs.go similarity index 100% rename from pkg/services/oracle/neofs/neofs.go rename to pkg/services/helpers/neofs/neofs.go diff --git a/pkg/services/oracle/neofs/neofs_test.go b/pkg/services/helpers/neofs/neofs_test.go similarity index 100% rename from pkg/services/oracle/neofs/neofs_test.go rename to pkg/services/helpers/neofs/neofs_test.go diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 25d9a9107..9a9f16fbb 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -13,7 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "go.uber.org/zap" )