From c79ffa967f3cb9aee870e7269f8047dca42ba24c Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 12 Dec 2024 18:37:44 +0300 Subject: [PATCH 1/2] services: add default values for NeoFSBlockFetcher configuration The minimum sufficient configuration is Addresses and ContainerID, example: ``` NeoFSBlockFetcher: Enabled: true Addresses: - st1.storage.fs.neo.org:8080 - st2.storage.fs.neo.org:8080 - st3.storage.fs.neo.org:8080 - st4.storage.fs.neo.org:8080 ContainerID: "87JRc7vyWcjW8uS32LMoLTAj4ckCzFZWfKbacjU3sAob" ``` Close #3718 Signed-off-by: Ekaterina Pavlova --- pkg/config/application_config_test.go | 14 ---------- pkg/config/blockfetcher_config.go | 5 +--- pkg/network/server.go | 3 +++ pkg/services/blockfetcher/blockfetcher.go | 26 ++++++++++++++----- .../blockfetcher/blockfetcher_test.go | 5 ++-- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/pkg/config/application_config_test.go b/pkg/config/application_config_test.go index 6b5af8d81..767e3942c 100644 --- a/pkg/config/application_config_test.go +++ b/pkg/config/application_config_test.go @@ -206,20 +206,6 @@ func TestNeoFSBlockFetcherValidation(t *testing.T) { shouldFail: true, errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)", }, - { - cfg: NeoFSBlockFetcher{ - InternalService: InternalService{Enabled: true}, - Timeout: time.Second, - ContainerID: validContainerID, - Addresses: []string{"127.0.0.1"}, - OIDBatchSize: 10, - BQueueSize: 20, - SkipIndexFilesSearch: false, - IndexFileSize: 0, - }, - shouldFail: true, - errMsg: "IndexFileSize is not set", - }, } for _, c := range cases { diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 22d4d46a0..65abc6295 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -38,14 +38,11 @@ func (cfg *NeoFSBlockFetcher) Validate() error { if err != nil { return fmt.Errorf("invalid container ID: %w", err) } - if cfg.BQueueSize < cfg.OIDBatchSize { + if cfg.BQueueSize > 0 && cfg.BQueueSize < cfg.OIDBatchSize { return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize) } if len(cfg.Addresses) == 0 { return errors.New("addresses are not set") } - if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 { - return errors.New("IndexFileSize is not set") - } return nil } diff --git a/pkg/network/server.go b/pkg/network/server.go index 6b043544a..1d4377462 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -224,6 +224,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { + s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize + } s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 90a0e53fc..d6f73426d 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -32,11 +32,17 @@ const ( // oidSize is the size of the object ID in NeoFS. oidSize = sha256.Size // defaultTimeout is the default timeout for NeoFS requests. - defaultTimeout = 5 * time.Minute - // defaultOIDBatchSize is the default number of OIDs to search and fetch at once. - defaultOIDBatchSize = 8000 + defaultTimeout = 10 * time.Minute + // DefaultQueueCacheSize is the default size of the queue cache. + DefaultQueueCacheSize = 16000 // defaultDownloaderWorkersCount is the default number of workers downloading blocks. - defaultDownloaderWorkersCount = 100 + defaultDownloaderWorkersCount = 500 + // defaultIndexFileSize is the default size of the index file. + defaultIndexFileSize = 128000 + // DefaultBlockAttribute is the default attribute name for block objects. + defaultBlockAttribute = "Block" + // defaultIndexFileAttribute is the default attribute name for index file objects. + defaultIndexFileAttribute = "Index" ) // Constants related to NeoFS pool request timeouts. @@ -146,13 +152,19 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc cfg.Timeout = defaultTimeout } if cfg.OIDBatchSize <= 0 { - cfg.OIDBatchSize = defaultOIDBatchSize + cfg.OIDBatchSize = cfg.BQueueSize / 2 } if cfg.DownloaderWorkersCount <= 0 { cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount } - if len(cfg.Addresses) == 0 { - return nil, errors.New("no addresses provided") + if cfg.IndexFileSize <= 0 { + cfg.IndexFileSize = defaultIndexFileSize + } + if cfg.BlockAttribute == "" { + cfg.BlockAttribute = defaultBlockAttribute + } + if cfg.IndexFileAttribute == "" { + cfg.IndexFileAttribute = defaultIndexFileAttribute } params := pool.DefaultOptions() diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index eda5f2864..1ecd6744d 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -65,7 +65,8 @@ func TestServiceConstructor(t *testing.T) { InternalService: config.InternalService{ Enabled: true, }, - Addresses: []string{"localhost:8080"}, + Addresses: []string{"localhost:8080"}, + BQueueSize: DefaultQueueCacheSize, } service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) require.NoError(t, err) @@ -73,7 +74,7 @@ func TestServiceConstructor(t *testing.T) { require.Equal(t, service.IsActive(), false) require.Equal(t, service.cfg.Timeout, defaultTimeout) - require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize) + require.Equal(t, service.cfg.OIDBatchSize, DefaultQueueCacheSize/2) require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) require.Equal(t, service.IsActive(), false) }) From 65bdc82da81a544046659e4214819772f90c875f Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Thu, 12 Dec 2024 18:46:42 +0300 Subject: [PATCH 2/2] *: move constant and NeoFS related code into separate package Signed-off-by: Ekaterina Pavlova --- cli/util/convert.go | 5 +- cli/util/upload_bin.go | 70 +++++------------ pkg/services/blockfetcher/blockfetcher.go | 75 +++++-------------- .../blockfetcher/blockfetcher_test.go | 5 +- pkg/services/helpers/neofs/blockstorage.go | 55 ++++++++++++++ .../{oracle => helpers}/neofs/neofs.go | 0 .../{oracle => helpers}/neofs/neofs_test.go | 0 pkg/services/oracle/request.go | 2 +- 8 files changed, 100 insertions(+), 112 deletions(-) create mode 100644 pkg/services/helpers/neofs/blockstorage.go rename pkg/services/{oracle => helpers}/neofs/neofs.go (100%) rename pkg/services/{oracle => helpers}/neofs/neofs_test.go (100%) diff --git a/cli/util/convert.go b/cli/util/convert.go index 886c56d6e..0117dd853 100644 --- a/cli/util/convert.go +++ b/cli/util/convert.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/cli/txctx" vmcli "github.com/nspcc-dev/neo-go/cli/vm" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/urfave/cli/v2" ) @@ -74,7 +75,7 @@ func NewCommands() []*cli.Command { &cli.UintFlag{ Name: "index-file-size", Usage: "Size of index file", - Value: 128000, + Value: neofs.DefaultIndexFileSize, }, &cli.UintFlag{ Name: "workers", @@ -89,7 +90,7 @@ func NewCommands() []*cli.Command { &cli.UintFlag{ Name: "retries", Usage: "Maximum number of Neo/NeoFS node request retries", - Value: 5, + Value: neofs.MaxRetries, Action: func(context *cli.Context, u uint) error { if u < 1 { return cli.Exit("retries should be greater than 0", 1) diff --git a/cli/util/upload_bin.go b/cli/util/upload_bin.go index 8e23226eb..990ce52de 100644 --- a/cli/util/upload_bin.go +++ b/cli/util/upload_bin.go @@ -2,7 +2,6 @@ package util import ( "context" - "crypto/sha256" "fmt" "slices" "strconv" @@ -14,7 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/rpcclient" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-sdk-go/checksum" @@ -30,35 +29,6 @@ import ( "github.com/urfave/cli/v2" ) -const ( - // Number of objects to search in a batch. We need to search with EQ filter to - // avoid partially-completed SEARCH responses. If EQ search haven't found object - // the object will be uploaded one more time which may lead to duplicating objects. - // We will have a risk of duplicates until #3645 is resolved (NeoFS guarantees - // search results). - searchBatchSize = 1 - // Size of object ID. - oidSize = sha256.Size -) - -// Constants related to retry mechanism. -const ( - // Initial backoff duration. - initialBackoff = 500 * time.Millisecond - // Backoff multiplier. - backoffFactor = 2 - // Maximum backoff duration. - maxBackoff = 20 * time.Second -) - -// Constants related to NeoFS pool request timeouts. -// Such big values are used to avoid NeoFS pool timeouts during block search and upload. -const ( - defaultDialTimeout = 10 * time.Minute - defaultStreamTimeout = 10 * time.Minute - defaultHealthcheckTimeout = 10 * time.Second -) - // poolWrapper wraps a NeoFS pool to adapt its Close method to return an error. type poolWrapper struct { *pool.Pool @@ -103,9 +73,9 @@ func uploadBin(ctx *cli.Context) error { signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey) params := pool.DefaultOptions() - params.SetHealthcheckTimeout(defaultHealthcheckTimeout) - params.SetNodeDialTimeout(defaultDialTimeout) - params.SetNodeStreamTimeout(defaultStreamTimeout) + params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) + params.SetNodeDialTimeout(neofs.DefaultDialTimeout) + params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params) if err != nil { return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1) @@ -166,15 +136,15 @@ func uploadBin(ctx *cli.Context) error { // retry function with exponential backoff. func retry(action func() error, maxRetries uint) error { var err error - backoff := initialBackoff + backoff := neofs.InitialBackoff for range maxRetries { if err = action(); err == nil { return nil // Success, no retry needed. } time.Sleep(backoff) // Backoff before retrying. - backoff *= time.Duration(backoffFactor) - if backoff > maxBackoff { - backoff = maxBackoff + backoff *= time.Duration(neofs.BackoffFactor) + if backoff > neofs.MaxBackoff { + backoff = neofs.MaxBackoff } } return err // Return the last error after exhausting retries. @@ -193,7 +163,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C errCh = make(chan error) doneCh = make(chan struct{}) wg sync.WaitGroup - emptyOID = make([]byte, oidSize) + emptyOID = make([]byte, neofs.OIDSize) ) fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1) wg.Add(int(numWorkers)) @@ -201,7 +171,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C go func(i uint) { defer wg.Done() for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers { - if slices.Compare(buf[blockIndex%indexFileSize*oidSize:blockIndex%indexFileSize*oidSize+oidSize], emptyOID) != 0 { + if slices.Compare(buf[blockIndex%indexFileSize*neofs.OIDSize:blockIndex%indexFileSize*neofs.OIDSize+neofs.OIDSize], emptyOID) != 0 { if debug { fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex) } @@ -263,7 +233,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C } return } - resOid.Encode(buf[blockIndex%indexFileSize*oidSize:]) + resOid.Encode(buf[blockIndex%indexFileSize*neofs.OIDSize:]) } }(i) } @@ -281,9 +251,9 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1) // Additional check for empty OIDs in the buffer. - for k := uint(0); k < (indexFileEnd-indexFileStart)*oidSize; k += oidSize { - if slices.Compare(buf[k:k+oidSize], emptyOID) == 0 { - return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oidSize, indexFileStart/indexFileSize*indexFileSize+k/oidSize) + for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OIDSize; k += neofs.OIDSize { + if slices.Compare(buf[k:k+neofs.OIDSize], emptyOID) == 0 { + return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OIDSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OIDSize) } } if indexFileEnd-indexFileStart == indexFileSize { @@ -310,7 +280,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint) (uint, []byte, error) { var ( // buf is used to store OIDs of the uploaded blocks. - buf = make([]byte, indexFileSize*oidSize) + buf = make([]byte, indexFileSize*neofs.OIDSize) doneCh = make(chan struct{}) errCh = make(chan error) @@ -377,7 +347,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun } pos := uint(blockIndex) % indexFileSize if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { - id.Encode(buf[pos*oidSize:]) + id.Encode(buf[pos*neofs.OIDSize:]) } } }() @@ -404,15 +374,15 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun // endIndex. It returns a buffered channel of resulting object IDs and closes it once // OID search is finished. Errors are sent to errCh in a non-blocking way. func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { - var res = make(chan oid.ID, 2*searchBatchSize) + var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) go func() { var wg sync.WaitGroup defer close(res) - for i := startIndex; i < endIndex; i += searchBatchSize * maxParallelSearches { + for i := startIndex; i < endIndex; i += neofs.DefaultSearchBatchSize * maxParallelSearches { for j := range maxParallelSearches { - start := i + j*searchBatchSize - end := start + searchBatchSize + start := i + j*neofs.DefaultSearchBatchSize + end := start + neofs.DefaultSearchBatchSize if start >= endIndex { break diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index d6f73426d..41aa7b698 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -2,7 +2,6 @@ package blockfetcher import ( "context" - "crypto/sha256" "errors" "fmt" "io" @@ -16,7 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" gio "github.com/nspcc-dev/neo-go/pkg/io" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -29,46 +28,8 @@ import ( ) const ( - // oidSize is the size of the object ID in NeoFS. - oidSize = sha256.Size - // defaultTimeout is the default timeout for NeoFS requests. - defaultTimeout = 10 * time.Minute // DefaultQueueCacheSize is the default size of the queue cache. DefaultQueueCacheSize = 16000 - // defaultDownloaderWorkersCount is the default number of workers downloading blocks. - defaultDownloaderWorkersCount = 500 - // defaultIndexFileSize is the default size of the index file. - defaultIndexFileSize = 128000 - // DefaultBlockAttribute is the default attribute name for block objects. - defaultBlockAttribute = "Block" - // defaultIndexFileAttribute is the default attribute name for index file objects. - defaultIndexFileAttribute = "Index" -) - -// Constants related to NeoFS pool request timeouts. -const ( - // defaultDialTimeout is a default timeout used to establish connection with - // NeoFS storage nodes. - defaultDialTimeout = 30 * time.Second - // defaultStreamTimeout is a default timeout used for NeoFS streams processing. - // It has significantly large value to reliably avoid timeout problems with heavy - // SEARCH requests. - defaultStreamTimeout = 10 * time.Minute - // defaultHealthcheckTimeout is a timeout for request to NeoFS storage node to - // decide if it is alive. - defaultHealthcheckTimeout = 10 * time.Second -) - -// Constants related to retry mechanism. -const ( - // maxRetries is the maximum number of retries for a single operation. - maxRetries = 5 - // initialBackoff is the initial backoff duration. - initialBackoff = 500 * time.Millisecond - // backoffFactor is the factor by which the backoff duration is multiplied. - backoffFactor = 2 - // maxBackoff is the maximum backoff duration. - maxBackoff = 20 * time.Second ) // Ledger is an interface to Blockchain sufficient for Service. @@ -149,28 +110,28 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc } } if cfg.Timeout <= 0 { - cfg.Timeout = defaultTimeout + cfg.Timeout = neofs.DefaultTimeout } if cfg.OIDBatchSize <= 0 { cfg.OIDBatchSize = cfg.BQueueSize / 2 } if cfg.DownloaderWorkersCount <= 0 { - cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount + cfg.DownloaderWorkersCount = neofs.DefaultDownloaderWorkersCount } if cfg.IndexFileSize <= 0 { - cfg.IndexFileSize = defaultIndexFileSize + cfg.IndexFileSize = neofs.DefaultIndexFileSize } if cfg.BlockAttribute == "" { - cfg.BlockAttribute = defaultBlockAttribute + cfg.BlockAttribute = neofs.DefaultBlockAttribute } if cfg.IndexFileAttribute == "" { - cfg.IndexFileAttribute = defaultIndexFileAttribute + cfg.IndexFileAttribute = neofs.DefaultIndexFileAttribute } params := pool.DefaultOptions() - params.SetHealthcheckTimeout(defaultHealthcheckTimeout) - params.SetNodeDialTimeout(defaultDialTimeout) - params.SetNodeStreamTimeout(defaultStreamTimeout) + params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout) + params.SetNodeDialTimeout(neofs.DefaultDialTimeout) + params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout) p, err := pool.New(pool.NewFlatNodeParams(cfg.Addresses), user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey), params) if err != nil { return nil, err @@ -369,7 +330,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { // streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { defer rc.Close() - oidBytes := make([]byte, oidSize) + oidBytes := make([]byte, neofs.OIDSize) oidsProcessed := 0 for { @@ -409,7 +370,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { func (bfs *Service) fetchOIDsBySearch() error { startIndex := bfs.chain.BlockHeight() //We need to search with EQ filter to avoid partially-completed SEARCH responses. - batchSize := uint32(1) + batchSize := uint32(neofs.DefaultSearchBatchSize) for { select { @@ -525,7 +486,7 @@ func (bfs *Service) IsActive() bool { func (bfs *Service) retry(action func() error) error { var ( err error - backoff = initialBackoff + backoff = neofs.InitialBackoff timer = time.NewTimer(0) ) defer func() { @@ -537,11 +498,11 @@ func (bfs *Service) retry(action func() error) error { } }() - for i := range maxRetries { + for i := range neofs.MaxRetries { if err = action(); err == nil { return nil } - if i == maxRetries-1 { + if i == neofs.MaxRetries-1 { break } timer.Reset(backoff) @@ -551,16 +512,16 @@ func (bfs *Service) retry(action func() error) error { case <-bfs.ctx.Done(): return bfs.ctx.Err() } - backoff *= time.Duration(backoffFactor) - if backoff > maxBackoff { - backoff = maxBackoff + backoff *= time.Duration(neofs.BackoffFactor) + if backoff > neofs.MaxBackoff { + backoff = neofs.MaxBackoff } } return err } func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { - u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.cfg.ContainerID, oid)) + u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid)) if err != nil { return nil, err } diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index 1ecd6744d..3c5c5a687 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -73,9 +74,9 @@ func TestServiceConstructor(t *testing.T) { require.NotNil(t, service) require.Equal(t, service.IsActive(), false) - require.Equal(t, service.cfg.Timeout, defaultTimeout) + require.Equal(t, service.cfg.Timeout, neofs.DefaultTimeout) require.Equal(t, service.cfg.OIDBatchSize, DefaultQueueCacheSize/2) - require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) + require.Equal(t, service.cfg.DownloaderWorkersCount, neofs.DefaultDownloaderWorkersCount) require.Equal(t, service.IsActive(), false) }) diff --git a/pkg/services/helpers/neofs/blockstorage.go b/pkg/services/helpers/neofs/blockstorage.go new file mode 100644 index 000000000..be3214c5b --- /dev/null +++ b/pkg/services/helpers/neofs/blockstorage.go @@ -0,0 +1,55 @@ +package neofs + +import ( + "crypto/sha256" + "time" +) + +// Constants related to NeoFS block storage. +const ( + // OIDSize is the size of the object ID in NeoFS. + OIDSize = sha256.Size + // DefaultTimeout is the default timeout for NeoFS requests. + DefaultTimeout = 10 * time.Minute + // DefaultDownloaderWorkersCount is the default number of workers downloading blocks. + DefaultDownloaderWorkersCount = 500 + // DefaultIndexFileSize is the default size of the index file. + DefaultIndexFileSize = 128000 + // DefaultBlockAttribute is the default attribute name for block objects. + DefaultBlockAttribute = "Block" + // DefaultIndexFileAttribute is the default attribute name for index file objects. + DefaultIndexFileAttribute = "Index" + + // DefaultSearchBatchSize is a number of objects to search in a batch. We need to + // search with EQ filter to avoid partially-completed SEARCH responses. If EQ search + // hasn't found object the object will be uploaded one more time which may lead to + // duplicating objects. We will have a risk of duplicates until #3645 is resolved + // (NeoFS guarantees search results). + DefaultSearchBatchSize = 1 +) + +// Constants related to NeoFS pool request timeouts. +const ( + // DefaultDialTimeout is a default timeout used to establish connection with + // NeoFS storage nodes. + DefaultDialTimeout = 30 * time.Second + // DefaultStreamTimeout is a default timeout used for NeoFS streams processing. + // It has significantly large value to reliably avoid timeout problems with heavy + // SEARCH requests. + DefaultStreamTimeout = 10 * time.Minute + // DefaultHealthcheckTimeout is a timeout for request to NeoFS storage node to + // decide if it is alive. + DefaultHealthcheckTimeout = 10 * time.Second +) + +// Constants related to retry mechanism. +const ( + // MaxRetries is the maximum number of retries for a single operation. + MaxRetries = 5 + // InitialBackoff is the initial backoff duration. + InitialBackoff = 500 * time.Millisecond + // BackoffFactor is the factor by which the backoff duration is multiplied. + BackoffFactor = 2 + // MaxBackoff is the maximum backoff duration. + MaxBackoff = 20 * time.Second +) diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/helpers/neofs/neofs.go similarity index 100% rename from pkg/services/oracle/neofs/neofs.go rename to pkg/services/helpers/neofs/neofs.go diff --git a/pkg/services/oracle/neofs/neofs_test.go b/pkg/services/helpers/neofs/neofs_test.go similarity index 100% rename from pkg/services/oracle/neofs/neofs_test.go rename to pkg/services/helpers/neofs/neofs_test.go diff --git a/pkg/services/oracle/request.go b/pkg/services/oracle/request.go index 25d9a9107..9a9f16fbb 100644 --- a/pkg/services/oracle/request.go +++ b/pkg/services/oracle/request.go @@ -13,7 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" "go.uber.org/zap" )