From 6d8eb21ed852e28feb4b220f56538d62928c5926 Mon Sep 17 00:00:00 2001 From: P4vlushaaa <112944483+P4vlushaaa@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:33:28 +0300 Subject: [PATCH] Add files via upload --- services/indexer/Dockerfile | 14 +++++ services/indexer/config.yaml | 4 ++ services/indexer/go.sum.go | 1 + .../internal/db/migrations/001_init.sql | 14 +++++ services/indexer/internal/db/repository.go | 43 ++++++++++++++++ .../indexer/internal/models/indexer_models.go | 16 ++++++ .../indexer/internal/subscriber/events.go | 51 +++++++++++++++++++ services/indexer/internal/utils/config.go | 35 +++++++++++++ services/indexer/internal/utils/logger.go | 23 +++++++++ services/indexer/main.go | 23 +++++++++ services/indexer/scripts/local_run.sh | 4 ++ 11 files changed, 228 insertions(+) create mode 100644 services/indexer/Dockerfile create mode 100644 services/indexer/config.yaml create mode 100644 services/indexer/go.sum.go create mode 100644 services/indexer/internal/db/migrations/001_init.sql create mode 100644 services/indexer/internal/db/repository.go create mode 100644 services/indexer/internal/models/indexer_models.go create mode 100644 services/indexer/internal/subscriber/events.go create mode 100644 services/indexer/internal/utils/config.go create mode 100644 services/indexer/internal/utils/logger.go create mode 100644 services/indexer/main.go create mode 100644 services/indexer/scripts/local_run.sh diff --git a/services/indexer/Dockerfile b/services/indexer/Dockerfile new file mode 100644 index 0000000..cebe06b --- /dev/null +++ b/services/indexer/Dockerfile @@ -0,0 +1,14 @@ +FROM golang:1.20 as builder + +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -o /app/bin/indexer main.go + +FROM alpine:3.17 +WORKDIR /app +COPY --from=builder /app/bin/indexer /app/indexer +COPY config.yaml /app/config.yaml +ENTRYPOINT ["/app/indexer"] \ No newline at end of file diff --git a/services/indexer/config.yaml b/services/indexer/config.yaml new file mode 100644 index 0000000..577eb2b --- /dev/null +++ b/services/indexer/config.yaml @@ -0,0 +1,4 @@ +neoRPC: "http://localhost:20332" +logLevel: "info" +dbDsn: "postgres://indexer:password@postgres:5432/indexerdb?sslmode=disable" +pollIntervalSeconds: 3 \ No newline at end of file diff --git a/services/indexer/go.sum.go b/services/indexer/go.sum.go new file mode 100644 index 0000000..c5d582c --- /dev/null +++ b/services/indexer/go.sum.go @@ -0,0 +1 @@ +package indexer diff --git a/services/indexer/internal/db/migrations/001_init.sql b/services/indexer/internal/db/migrations/001_init.sql new file mode 100644 index 0000000..9670dbf --- /dev/null +++ b/services/indexer/internal/db/migrations/001_init.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS blocks ( + height BIGINT PRIMARY KEY, + time TIMESTAMP NOT NULL +); + +CREATE TABLE IF NOT EXISTS nft_transfers ( + id SERIAL PRIMARY KEY, + block_height BIGINT NOT NULL, + tx_hash VARCHAR(66) NOT NULL, + token_id TEXT NOT NULL, + from_addr VARCHAR(42), + to_addr VARCHAR(42), + timestamp TIMESTAMP NOT NULL + ); \ No newline at end of file diff --git a/services/indexer/internal/db/repository.go b/services/indexer/internal/db/repository.go new file mode 100644 index 0000000..ab37180 --- /dev/null +++ b/services/indexer/internal/db/repository.go @@ -0,0 +1,43 @@ +package db + +import ( + "database/sql" + "fmt" + _ "github.com/lib/pq" +) + +type Repository struct { + db *sql.DB +} + +func NewRepository(dsn string) (*Repository, error) { + db, err := sql.Open("postgres", dsn) + if err != nil { + return nil, err + } + if err := db.Ping(); err != nil { + return nil, err + } + return &Repository{db: db}, nil +} + +func (r *Repository) GetLastIndexedBlock() (int64, error) { + var height int64 + err := r.db.QueryRow(`SELECT height FROM blocks ORDER BY height DESC LIMIT 1`).Scan(&height) + if err == sql.ErrNoRows { + return 0, nil + } + return height, err +} + +func (r *Repository) SaveBlock(height int64) error { + _, err := r.db.Exec(`INSERT INTO blocks(height,time) VALUES($1,NOW())`, height) + return err +} + +func (r *Repository) SaveNFTTransfer(txHash string, blockHeight int64, tokenId, from, to string) error { + _, err := r.db.Exec(`INSERT INTO nft_transfers(block_height, tx_hash, token_id, from_addr, to_addr, timestamp) + VALUES($1, $2, $3, $4, $5, NOW())`, + blockHeight, txHash, tokenId, from, to) + return err +} diff --git a/services/indexer/internal/models/indexer_models.go b/services/indexer/internal/models/indexer_models.go new file mode 100644 index 0000000..008d063 --- /dev/null +++ b/services/indexer/internal/models/indexer_models.go @@ -0,0 +1,16 @@ +package models + +type BlockRecord struct { + Height int64 `json:"height"` + Time string `json:"time"` +} + +type NftTransferRecord struct { + ID int64 `json:"id"` + BlockHeight int64 `json:"block_height"` + TxHash string `json:"tx_hash"` + TokenId string `json:"token_id"` + FromAddr string `json:"from_addr"` + ToAddr string `json:"to_addr"` + Timestamp string `json:"timestamp"` +} diff --git a/services/indexer/internal/subscriber/events.go b/services/indexer/internal/subscriber/events.go new file mode 100644 index 0000000..1b0720d --- /dev/null +++ b/services/indexer/internal/subscriber/events.go @@ -0,0 +1,51 @@ +package subscriber + +import ( + "context" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "time" + "web3-onlyfans/services/indexer/internal/utils" +) + +type BlockSubscriber struct { + cfg *utils.Config + logger utils.Logger + rpc *rpcclient.Client +} + +func NewBlockSubscriber(cfg *utils.Config, logger utils.Logger) (*BlockSubscriber, error) { + cli, err := rpcclient.New(context.Background(), cfg.NeoRPC, rpcclient.Options{}) + if err != nil { + return nil, err + } + return &BlockSubscriber{ + cfg: cfg, + logger: logger, + rpc: cli, + }, nil +} + +func (s *BlockSubscriber) Start() { + pollInterval := time.Duration(s.cfg.PollIntervalSeconds) * time.Second + for { + err := s.pollOnce() + if err != nil { + s.logger.Errorf("poll error: %v", err) + } + time.Sleep(pollInterval) + } +} + +func (s *BlockSubscriber) pollOnce() error { + // Здесь логика: узнаём текущий блок, сверяемся с локальным бд, + // проходимся по новым блокам, анализируем транзакции/нотификации. + // Для упрощения покажем только "пример" — в реале будет много кода. + + height, err := s.rpc.GetBlockCount() + if err != nil { + return err + } + s.logger.Debugf("current block count: %d", height) + // ... далее - обработка новых блоков (Tx, Notifications, Transfers). + return nil +} diff --git a/services/indexer/internal/utils/config.go b/services/indexer/internal/utils/config.go new file mode 100644 index 0000000..95dd714 --- /dev/null +++ b/services/indexer/internal/utils/config.go @@ -0,0 +1,35 @@ +package utils + +import ( + "gopkg.in/yaml.v2" + "io/ioutil" + "os" + "strconv" +) + +type Config struct { + NeoRPC string `yaml:"neoRPC"` + LogLevel string `yaml:"logLevel"` + DBDsn string `yaml:"dbDsn"` + PollIntervalSeconds int `yaml:"pollIntervalSeconds"` +} + +func LoadConfig(path string) (*Config, error) { + data, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var c Config + if err := yaml.Unmarshal(data, &c); err != nil { + return nil, err + } + + // Переопределения через ENV (пример) + if e := os.Getenv("INDEXER_POLL_INTERVAL"); e != "" { + if val, err := strconv.Atoi(e); err == nil { + c.PollIntervalSeconds = val + } + } + + return &c, nil +} diff --git a/services/indexer/internal/utils/logger.go b/services/indexer/internal/utils/logger.go new file mode 100644 index 0000000..61595bd --- /dev/null +++ b/services/indexer/internal/utils/logger.go @@ -0,0 +1,23 @@ +package utils + +import ( + "log" + "strings" +) + +type Logger interface { + Debugf(format string, v ...interface{}) + Infof(format string, v ...interface{}) + Errorf(format string, v ...interface{}) + Fatalf(format string, v ...interface{}) +} + +type SimpleLogger struct { + level string +} + +func NewLogger(level string) Logger { + return &SimpleLogger{level: strings.ToLower(level)} +} + +// ... тот же код, что в API diff --git a/services/indexer/main.go b/services/indexer/main.go new file mode 100644 index 0000000..5005cb4 --- /dev/null +++ b/services/indexer/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "log" + "web3-onlyfans/services/indexer/internal/subscriber" + "web3-onlyfans/services/indexer/internal/utils" +) + +func main() { + cfg, err := utils.LoadConfig("./config.yaml") + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + logger := utils.NewLogger(cfg.LogLevel) + + sub, err := subscriber.NewBlockSubscriber(cfg, logger) + if err != nil { + logger.Fatalf("failed to create subscriber: %v", err) + } + + logger.Infof("Indexer started, polling blocks from %s", cfg.NeoRPC) + sub.Start() +} diff --git a/services/indexer/scripts/local_run.sh b/services/indexer/scripts/local_run.sh new file mode 100644 index 0000000..4a7cd5e --- /dev/null +++ b/services/indexer/scripts/local_run.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +export INDEXER_CONFIG_PATH=./config.yaml +go run main.go \ No newline at end of file