diff --git a/pkg/morph/subscriber/subscriber.go b/pkg/morph/subscriber/subscriber.go index 7b5576b117..7edaa53546 100644 --- a/pkg/morph/subscriber/subscriber.go +++ b/pkg/morph/subscriber/subscriber.go @@ -40,9 +40,11 @@ type ( // Params is a group of Subscriber constructor parameters. Params struct { - Log *zap.Logger - Endpoint string - DialTimeout time.Duration + Log *zap.Logger + Endpoint string + DialTimeout time.Duration + RPCInitTimeout time.Duration + StartFromBlock uint32 } ) @@ -192,6 +194,15 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return nil, fmt.Errorf("could not init ws client: %w", err) } + p.Log.Debug("event subscriber awaits RPC node", + zap.String("endpoint", p.Endpoint), + zap.Uint32("min_block_height", p.StartFromBlock)) + + err = awaitHeight(wsClient, p.StartFromBlock, p.RPCInitTimeout) + if err != nil { + return nil, err + } + sub := &subscriber{ RWMutex: new(sync.RWMutex), log: p.Log, @@ -209,3 +220,29 @@ func New(ctx context.Context, p *Params) (Subscriber, error) { return sub, nil } + +// awaitHeight checks if remote client has least expected block height and +// returns error if it is not reached that height after timeout duration. +// This function is required to avoid connections to unsynced RPC nodes, because +// they can produce events from the past that should not be processed by +// NeoFS nodes. +func awaitHeight(wsClient *client.WSClient, startFrom uint32, timeout time.Duration) error { + if startFrom == 0 { + return nil + } + + for ch := time.After(timeout); ; { + select { + case <-ch: + return fmt.Errorf("could not init ws client: didn't reach expected height %d", startFrom) + default: + } + height, err := wsClient.GetBlockCount() + if err != nil { + return fmt.Errorf("could not get block height: %w", err) + } else if height >= startFrom { + return nil + } + time.Sleep(100 * time.Millisecond) + } +}