diff --git a/pkg/config/application_config_test.go b/pkg/config/application_config_test.go index 6b5af8d81..767e3942c 100644 --- a/pkg/config/application_config_test.go +++ b/pkg/config/application_config_test.go @@ -206,20 +206,6 @@ func TestNeoFSBlockFetcherValidation(t *testing.T) { shouldFail: true, errMsg: "BQueueSize (5) is lower than OIDBatchSize (10)", }, - { - cfg: NeoFSBlockFetcher{ - InternalService: InternalService{Enabled: true}, - Timeout: time.Second, - ContainerID: validContainerID, - Addresses: []string{"127.0.0.1"}, - OIDBatchSize: 10, - BQueueSize: 20, - SkipIndexFilesSearch: false, - IndexFileSize: 0, - }, - shouldFail: true, - errMsg: "IndexFileSize is not set", - }, } for _, c := range cases { diff --git a/pkg/config/blockfetcher_config.go b/pkg/config/blockfetcher_config.go index 22d4d46a0..65abc6295 100644 --- a/pkg/config/blockfetcher_config.go +++ b/pkg/config/blockfetcher_config.go @@ -38,14 +38,11 @@ func (cfg *NeoFSBlockFetcher) Validate() error { if err != nil { return fmt.Errorf("invalid container ID: %w", err) } - if cfg.BQueueSize < cfg.OIDBatchSize { + if cfg.BQueueSize > 0 && cfg.BQueueSize < cfg.OIDBatchSize { return fmt.Errorf("BQueueSize (%d) is lower than OIDBatchSize (%d)", cfg.BQueueSize, cfg.OIDBatchSize) } if len(cfg.Addresses) == 0 { return errors.New("addresses are not set") } - if !cfg.SkipIndexFilesSearch && cfg.IndexFileSize == 0 { - return errors.New("IndexFileSize is not set") - } return nil } diff --git a/pkg/network/server.go b/pkg/network/server.go index 6b043544a..1d4377462 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -224,6 +224,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { + s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize + } s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) var err error s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 90a0e53fc..d6f73426d 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -32,11 +32,17 @@ const ( // oidSize is the size of the object ID in NeoFS. oidSize = sha256.Size // defaultTimeout is the default timeout for NeoFS requests. - defaultTimeout = 5 * time.Minute - // defaultOIDBatchSize is the default number of OIDs to search and fetch at once. - defaultOIDBatchSize = 8000 + defaultTimeout = 10 * time.Minute + // DefaultQueueCacheSize is the default size of the queue cache. + DefaultQueueCacheSize = 16000 // defaultDownloaderWorkersCount is the default number of workers downloading blocks. - defaultDownloaderWorkersCount = 100 + defaultDownloaderWorkersCount = 500 + // defaultIndexFileSize is the default size of the index file. + defaultIndexFileSize = 128000 + // DefaultBlockAttribute is the default attribute name for block objects. + defaultBlockAttribute = "Block" + // defaultIndexFileAttribute is the default attribute name for index file objects. + defaultIndexFileAttribute = "Index" ) // Constants related to NeoFS pool request timeouts. @@ -146,13 +152,19 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBloc cfg.Timeout = defaultTimeout } if cfg.OIDBatchSize <= 0 { - cfg.OIDBatchSize = defaultOIDBatchSize + cfg.OIDBatchSize = cfg.BQueueSize / 2 } if cfg.DownloaderWorkersCount <= 0 { cfg.DownloaderWorkersCount = defaultDownloaderWorkersCount } - if len(cfg.Addresses) == 0 { - return nil, errors.New("no addresses provided") + if cfg.IndexFileSize <= 0 { + cfg.IndexFileSize = defaultIndexFileSize + } + if cfg.BlockAttribute == "" { + cfg.BlockAttribute = defaultBlockAttribute + } + if cfg.IndexFileAttribute == "" { + cfg.IndexFileAttribute = defaultIndexFileAttribute } params := pool.DefaultOptions() diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go index eda5f2864..1ecd6744d 100644 --- a/pkg/services/blockfetcher/blockfetcher_test.go +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -65,7 +65,8 @@ func TestServiceConstructor(t *testing.T) { InternalService: config.InternalService{ Enabled: true, }, - Addresses: []string{"localhost:8080"}, + Addresses: []string{"localhost:8080"}, + BQueueSize: DefaultQueueCacheSize, } service, err := New(ledger, cfg, logger, mockPut.putBlock, shutdownCallback) require.NoError(t, err) @@ -73,7 +74,7 @@ func TestServiceConstructor(t *testing.T) { require.Equal(t, service.IsActive(), false) require.Equal(t, service.cfg.Timeout, defaultTimeout) - require.Equal(t, service.cfg.OIDBatchSize, defaultOIDBatchSize) + require.Equal(t, service.cfg.OIDBatchSize, DefaultQueueCacheSize/2) require.Equal(t, service.cfg.DownloaderWorkersCount, defaultDownloaderWorkersCount) require.Equal(t, service.IsActive(), false) })