diff --git a/Dockerfile b/Dockerfile index b1f63da..69e07c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,42 +1,37 @@ -ARG GO_VERSION=1.22.5 +FROM golang:1.22.5-bookworm as builder -# Stage 1: Dependency management and build -FROM golang:${GO_VERSION}-bookworm as builder +# Install CA certificates and build essentials +RUN apt-get update && apt-get install -y ca-certificates build-essential && rm -rf /var/lib/apt/lists/* +# Set the working directory inside the container to /app WORKDIR /app -# Copy go.mod and go.sum files +# Copy the Go module files COPY go.mod go.sum ./ -# Download dependencies and verify modules -RUN go mod download && go mod verify +# Download dependencies +RUN go mod download # Copy the rest of the application source code COPY . . -# Run go mod tidy to ensure the go.mod file is up to date -RUN go mod tidy +# Build the application; output the binary to a known location +RUN go build -v -o /run-app . -# Build the application and capture the output -RUN go build -v -o /run-app . - -# Stage 2: Final stage +# Final stage based on Debian Bookworm-slim FROM debian:bookworm-slim -# Install CA certificates in the final image +# Install CA certificates in the final image to ensure they are present. RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* # Copy the built executable from the builder stage COPY --from=builder /run-app /usr/local/bin/run-app -# Create necessary directory +# Create a directory for the data RUN mkdir -p /app/data -# Copy the CSV file to /app/data -COPY /data/ip_metadata.csv /app/data/ip_metadata.csv - -# Set the working directory -WORKDIR /app +# Copy the CSV file +COPY data/ip_metadata.csv /app/data/ip_metadata.csv # Set the command to run the application CMD ["/usr/local/bin/run-app"] \ No newline at end of file diff --git a/clickhouse/client.go b/clickhouse/client.go index 7423409..4e30c4c 100644 --- a/clickhouse/client.go +++ b/clickhouse/client.go @@ -47,7 +47,7 @@ func (c *ClickhouseClient) LoadIPMetadataFromCSV() error { reader := csv.NewReader(file) reader.FieldsPerRecord = -1 // Allow variable number of fields - batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata") + batch, err := c.ChConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata") if err != nil { return fmt.Errorf("failed to prepare batch: %w", err) } @@ -176,7 +176,7 @@ func parseFloat(s string) (float64, error) { func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) { query := fmt.Sprintf("SELECT count(*) FROM %s", tableName) var count uint64 - err := c.chConn.QueryRow(context.Background(), query).Scan(&count) + err := c.ChConn.QueryRow(context.Background(), query).Scan(&count) if err != nil { return false, err } @@ -217,7 +217,7 @@ func IPMetadataDDL(db string) string { asn String, asn_organization String, asn_type String - ) ENGINE = MergeTree() + ) ENGINE = ReplacingMergeTree() ORDER BY ip; `, db) } @@ -267,74 +267,78 @@ type ClickhouseConfig struct { } type ClickhouseClient struct { - cfg *ClickhouseConfig - log zerolog.Logger - - chConn driver.Conn - - ValidatorEventChan chan *types.ValidatorEvent - IPMetadataEventChan chan *types.IPMetadataEvent - PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent - MetadataReceivedEventChan chan *types.MetadataReceivedEvent + cfg *ClickhouseConfig + log zerolog.Logger + ChConn clickhouse.Conn + + ValidatorEventChan chan *types.ValidatorEvent + IPMetadataEventChan chan *types.IPMetadataEvent + PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent + MetadataReceivedEventChan chan *types.MetadataReceivedEvent } func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) { - log := log.NewLogger("clickhouse") - - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{cfg.Endpoint}, - DialTimeout: time.Second * 60, - Auth: clickhouse.Auth{ - Database: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - }, - Debugf: func(format string, v ...interface{}) { - log.Debug().Str("module", "clickhouse").Msgf(format, v) - }, - Protocol: clickhouse.Native, - TLS: &tls.Config{}, - }) + log := log.NewLogger("clickhouse") + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{cfg.Endpoint}, + DialTimeout: time.Second * 60, + Auth: clickhouse.Auth{ + Database: cfg.DB, + Username: cfg.Username, + Password: cfg.Password, + }, + Debugf: func(format string, v ...interface{}) { + log.Debug().Str("module", "clickhouse").Msgf(format, v) + }, + Protocol: clickhouse.Native, + TLS: &tls.Config{}, + }) - if err != nil { - return nil, err - } + if err != nil { + return nil, err + } + + return &ClickhouseClient{ + cfg: cfg, + log: log, + ChConn: conn, + + ValidatorEventChan: make(chan *types.ValidatorEvent, 16384), + IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384), + PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384), + MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384), + }, nil +} - return &ClickhouseClient{ - cfg: cfg, - log: log, - chConn: conn, - ValidatorEventChan: make(chan *types.ValidatorEvent, 16384), - IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384), - PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384), - MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384), - }, nil +func (c *ClickhouseClient) Close() error { + return c.ChConn.Close() } func (c *ClickhouseClient) initializeTables() error { // Create validator_metadata table - if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating validator_metadata table") return err } // Create ip_metadata table - if err := c.chConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating ip_metadata table") return err } // Create peer_discovered_events table - if err := c.chConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating peer_discovered_events table") return err } // Create metadata_received_events table - if err := c.chConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating metadata_received_events table") return err } @@ -368,7 +372,7 @@ func (c *ClickhouseClient) Start() error { // BatchProcessor processes events in batches for a specified table in ClickHouse. func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan <-chan T, maxSize uint64) { // Prepare the initial batch. - batch, err := client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err := client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) if err != nil { client.log.Error().Err(err).Msg("Failed to prepare batch") return @@ -396,7 +400,7 @@ func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan } // Prepare a new batch after sending the current batch. - batch, err = client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err = client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) if err != nil { client.log.Error().Err(err).Msg("Failed to prepare new batch after sending") return diff --git a/consumer/consumer.go b/consumer/consumer.go index 6bf295e..e59cc27 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -6,17 +6,21 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net" "net/http" "os" "os/signal" + "strconv" + "strings" + "sync" "syscall" "time" - //"sync" - ma "github.com/multiformats/go-multiaddr" ch "github.com/chainbound/valtrack/clickhouse" "github.com/chainbound/valtrack/log" "github.com/chainbound/valtrack/types" + "github.com/ipinfo/go/v2/ipinfo" + ma "github.com/multiformats/go-multiaddr" _ "github.com/mattn/go-sqlite3" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" @@ -45,110 +49,130 @@ type Consumer struct { ipMetadataWriter *writer.ParquetWriter js jetstream.JetStream - peerDiscoveredChan chan *types.PeerDiscoveredEvent - metadataReceivedChan chan *types.MetadataReceivedEvent - validatorMetadataChan chan *types.MetadataReceivedEvent - ipMetadataChan chan *types.IPMetadataEvent + peerDiscoveredChan chan *types.PeerDiscoveredEvent + metadataReceivedChan chan *types.MetadataReceivedEvent + validatorMetadataChan chan *types.MetadataReceivedEvent + ipMetadataChan chan *types.IPMetadataEvent chClient *ch.ClickhouseClient db *sql.DB dune *Dune -} -func RunConsumer(cfg *ConsumerConfig) { - log := log.NewLogger("consumer") + ipCache map[string]*types.IPMetadataEvent + ipCacheMu sync.RWMutex + ipCacheTTL time.Duration + ipInfoToken string +} - dbPath := fmt.Sprintf("%s/validator_tracker.sqlite", basePath) - db, err := sql.Open("sqlite3", dbPath) +func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream, chClient *ch.ClickhouseClient, db *sql.DB, dune *Dune) (*Consumer, error) { + discoveryFilePath := fmt.Sprintf("%s/discovery_events.parquet", basePath) + w_discovery, err := local.NewLocalFileWriter(discoveryFilePath) if err != nil { - log.Error().Err(err).Msg("Error opening database") + return nil, fmt.Errorf("error creating discovery events parquet file: %w", err) } - defer db.Close() - err = setupDatabase(db) + metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath) + w_metadata, err := local.NewLocalFileWriter(metadataFilePath) if err != nil { - log.Error().Err(err).Msg("Error setting up database") + return nil, fmt.Errorf("error creating metadata events parquet file: %w", err) } - // err = loadIPMetadataFromCSV(db, "ip_metadata.csv") -// if err != nil { -// log.Error().Err(err).Msg("Error setting up database") -// } - - log.Info().Msg("Sqlite DB setup complete") - - nc, err := nats.Connect(cfg.NatsURL) + validatorFilePath := fmt.Sprintf("%s/validator_metadata_events.parquet", basePath) + w_validator, err := local.NewLocalFileWriter(validatorFilePath) if err != nil { - log.Error().Err(err).Msg("Error connecting to NATS") + return nil, fmt.Errorf("error creating validator parquet file: %w", err) } - defer nc.Close() - js, err := jetstream.New(nc) + ipMetadataFilePath := fmt.Sprintf("%s/ip_metadata_events.parquet", basePath) + w_ipMetadata, err := local.NewLocalFileWriter(ipMetadataFilePath) if err != nil { - log.Error().Err(err).Msg("Error creating JetStream context") + return nil, fmt.Errorf("error creating IP metadata events parquet file: %w", err) } - discoveryFilePath := fmt.Sprintf("%s/discovery_events.parquet", basePath) - w_discovery, err := local.NewLocalFileWriter(discoveryFilePath) + discoveryWriter, err := writer.NewParquetWriter(w_discovery, new(types.PeerDiscoveredEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating discovery events parquet file") + return nil, fmt.Errorf("error creating Peer discovered Parquet writer: %w", err) } - defer w_discovery.Close() - metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath) - w_metadata, err := local.NewLocalFileWriter(metadataFilePath) + metadataWriter, err := writer.NewParquetWriter(w_metadata, new(types.MetadataReceivedEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating metadata events parquet file") + return nil, fmt.Errorf("error creating Metadata Parquet writer: %w", err) } - defer w_metadata.Close() - validatorFilePath := fmt.Sprintf("%s/validator_metadata_events.parquet", basePath) - w_validator, err := local.NewLocalFileWriter(validatorFilePath) + validatorWriter, err := writer.NewParquetWriter(w_validator, new(types.ValidatorEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating validator parquet file") + return nil, fmt.Errorf("error creating Validator Parquet writer: %w", err) } - defer w_validator.Close() - ipMetadataFilePath := fmt.Sprintf("%s/ip_metadata_events.parquet", basePath) - w_ipMetadata, err := local.NewLocalFileWriter(ipMetadataFilePath) + ipMetadataWriter, err := writer.NewParquetWriter(w_ipMetadata, new(types.IPMetadataEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating IP metadata events parquet file") + return nil, fmt.Errorf("error creating IP Metadata Parquet writer: %w", err) } - defer w_ipMetadata.Close() - discoveryWriter, err := writer.NewParquetWriter(w_discovery, new(types.PeerDiscoveredEvent), 4) + return &Consumer{ + log: log, + discoveryWriter: discoveryWriter, + metadataWriter: metadataWriter, + validatorWriter: validatorWriter, + ipMetadataWriter: ipMetadataWriter, + js: js, + + peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), + metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), + validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), + ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), + + chClient: chClient, + db: db, + dune: dune, + + ipCache: make(map[string]*types.IPMetadataEvent), + ipCacheTTL: 1 * time.Hour, + ipInfoToken: os.Getenv("IPINFO_TOKEN"), + }, nil +} + +func RunConsumer(cfg *ConsumerConfig) { + log := log.NewLogger("consumer") + + dbPath := fmt.Sprintf("%s/validator_tracker.sqlite", basePath) + db, err := sql.Open("sqlite3", dbPath) if err != nil { - log.Error().Err(err).Msg("Error creating Peer discovered Parquet writer") + log.Error().Err(err).Msg("Error opening database") + return } - defer discoveryWriter.WriteStop() + defer db.Close() - metadataWriter, err := writer.NewParquetWriter(w_metadata, new(types.MetadataReceivedEvent), 4) + err = setupDatabase(db) if err != nil { - log.Error().Err(err).Msg("Error creating Metadata Parquet writer") + log.Error().Err(err).Msg("Error setting up database") + return } - defer metadataWriter.WriteStop() - validatorWriter, err := writer.NewParquetWriter(w_validator, new(types.ValidatorEvent), 4) + log.Info().Msg("Sqlite DB setup complete") + + nc, err := nats.Connect(cfg.NatsURL) if err != nil { - log.Error().Err(err).Msg("Error creating Validator Parquet writer") + log.Error().Err(err).Msg("Error connecting to NATS") + return } - defer validatorWriter.WriteStop() + defer nc.Close() - ipMetadataWriter, err := writer.NewParquetWriter(w_ipMetadata, new(types.IPMetadataEvent), 4) + js, err := jetstream.New(nc) if err != nil { - log.Error().Err(err).Msg("Error creating IP Metadata Parquet writer") + log.Error().Err(err).Msg("Error creating JetStream context") + return } - defer ipMetadataWriter.WriteStop() chCfg := ch.ClickhouseConfig{ - Endpoint: cfg.ChCfg.Endpoint, - DB: cfg.ChCfg.DB, - Username: cfg.ChCfg.Username, - Password: cfg.ChCfg.Password, - MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, - MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize, - MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, - MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize, + Endpoint: cfg.ChCfg.Endpoint, + DB: cfg.ChCfg.DB, + Username: cfg.ChCfg.Username, + Password: cfg.ChCfg.Password, + MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, + MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize, + MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, + MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize, } var chClient *ch.ClickhouseClient @@ -156,11 +180,14 @@ func RunConsumer(cfg *ConsumerConfig) { chClient, err = ch.NewClickhouseClient(&chCfg) if err != nil { log.Error().Err(err).Msg("Error creating Clickhouse client") + return } + defer chClient.Close() err = chClient.Start() if err != nil { log.Error().Err(err).Msg("Error starting Clickhouse client") + return } } @@ -169,22 +196,10 @@ func RunConsumer(cfg *ConsumerConfig) { dune = NewDune(cfg.DuneNamespace, cfg.DuneApiKey) } - consumer := Consumer{ - log: log, - discoveryWriter: discoveryWriter, - metadataWriter: metadataWriter, - validatorWriter: validatorWriter, - ipMetadataWriter: ipMetadataWriter, - js: js, - - peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), - metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), - validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), - ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), - - chClient: chClient, - db: db, - dune: dune, + consumer, err := NewConsumer(cfg, log, js, chClient, db, dune) + if err != nil { + log.Error().Err(err).Msg("Error creating consumer") + return } go func() { @@ -194,15 +209,17 @@ func RunConsumer(cfg *ConsumerConfig) { }() ipInfoToken := os.Getenv("IPINFO_TOKEN") - if ipInfoToken == "" { - log.Error().Msg("IPINFO_TOKEN environment variable is required") - } - - go consumer.runValidatorMetadataEventHandler(ipInfoToken) - - go consumer.processIPMetadataEvents() + if ipInfoToken == "" { + log.Error().Msg("IPINFO_TOKEN environment variable is required") + return + } - + go func() { + if err := consumer.runValidatorMetadataEventHandler(ipInfoToken); err != nil { + log.Error().Err(err).Msg("Error in validator metadata handler") + } + }() + go consumer.processIPMetadataEvents() server := &http.Server{Addr: ":8080", Handler: nil} http.HandleFunc("/validators", createGetValidatorsHandler(db)) @@ -212,7 +229,6 @@ func RunConsumer(cfg *ConsumerConfig) { log.Error().Err(err).Msg("Error starting HTTP server") } }() - defer server.Shutdown(context.Background()) if dune != nil { log.Info().Msg("Starting to publish to Dune") @@ -235,20 +251,32 @@ func RunConsumer(cfg *ConsumerConfig) { quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - - <-quit + + // Shutdown process + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + select { + case <-quit: + log.Info().Msg("Shutdown signal received") + case <-ctx.Done(): + log.Info().Msg("Shutdown timeout") + } + close(consumer.ipMetadataChan) + log.Info().Msg("Shutting down consumer") - //wg := &sync.WaitGroup{} - //wg.Add(1) - // go func() { - // defer wg.Done() - // consumer.processIPMetadataEvents() - // }() + if err := server.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("Error shutting down HTTP server") + } - // <-quit - // close(consumer.ipMetadataChan) // Signal goroutine to finish - // wg.Wait() // Wait for the goroutine to clean up + if chClient != nil { + if err := chClient.Close(); err != nil { + log.Error().Err(err).Msg("Error closing Clickhouse client") + } + } + + log.Info().Msg("Consumer shutdown complete") } func (c *Consumer) Start(name string) error { @@ -274,9 +302,10 @@ func (c *Consumer) Start(name string) error { return err } - // Load IP metadata from CSV - if err := c.chClient.LoadIPMetadataFromCSV(); err != nil { - c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV") + if c.chClient != nil { + if err := c.chClient.LoadIPMetadataFromCSV(); err != nil { + c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV") + } } go func() { @@ -291,10 +320,8 @@ func (c *Consumer) Start(name string) error { return } - //c.log.Debug().Int("batch_size", len(batch.Messages())).Msg("Processing batch of messages") - for msg := range batch.Messages() { - handleMessage(c, msg) + c.handleMessage(msg) c.log.Debug().Str("msg_subject", msg.Subject()).Msg("Message handled") } } @@ -303,7 +330,7 @@ func (c *Consumer) Start(name string) error { return nil } -func handleMessage(c *Consumer, msg jetstream.Msg) { +func (c *Consumer) handleMessage(msg jetstream.Msg) { md, _ := msg.Metadata() progress := float64(md.Sequence.Stream) / (float64(md.NumPending) + float64(md.Sequence.Stream)) * 100 @@ -318,7 +345,6 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata received") c.ipMetadataChan <- &ipEvent - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.IPMetadataEventChan <- &ipEvent } @@ -332,9 +358,18 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { } c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered") + + ipMetadata, err := c.getIPMetadata(event.IP) + if err != nil { + c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to fetch IP metadata") + } else { + if err := c.ensureIPMetadataInClickHouse(ipMetadata); err != nil { + c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to ensure IP metadata in ClickHouse") + } + } + c.storeDiscoveryEvent(event) - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.PeerDiscoveredEventChan <- &event } @@ -351,7 +386,6 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { c.handleMetadataEvent(event) c.storeMetadataEvent(event) - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.MetadataReceivedEventChan <- &event } @@ -365,6 +399,167 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { } } +func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { + // Check cache first + c.ipCacheMu.RLock() + if metadata, found := c.ipCache[ip]; found { + c.ipCacheMu.RUnlock() + return metadata, nil + } + c.ipCacheMu.RUnlock() + + // Check ClickHouse + metadata, err := c.getIPMetadataFromClickHouse(ip) + if err != nil { + c.log.Warn().Err(err).Str("ip", ip).Msg("Error querying ClickHouse for IP metadata, falling back to API") + } else if metadata != nil { + // Found in ClickHouse, cache and return + c.cacheIPMetadata(ip, metadata) + return metadata, nil + } + + // Not found in ClickHouse or error occurred, fetch from IPInfo API + ipInfo, err := c.fetchIPInfoFromAPI(ip) + if err != nil { + return nil, fmt.Errorf("failed to fetch IP info: %w", err) + } + + metadata = convertIPInfoToMetadata(ipInfo) + + // Only insert into ClickHouse if fetched from API + if c.chClient != nil { + if err := c.ensureIPMetadataInClickHouse(metadata); err != nil { + c.log.Warn().Err(err).Str("ip", ip).Msg("Failed to insert IP metadata into ClickHouse") + } + } + + c.cacheIPMetadata(ip, metadata) + + return metadata, nil +} + + +func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEvent, error) { + if c.chClient == nil { + return nil, fmt.Errorf("ClickHouse client is not initialized") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var metadata types.IPMetadataEvent + query := ` + SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type + FROM ip_metadata + WHERE ip = ? + LIMIT 1 + ` + + c.log.Debug().Str("ip", ip).Msg("Querying ClickHouse for IP metadata") + + rows, err := c.chClient.ChConn.Query(ctx, query, ip) + if err != nil { + c.log.Error().Err(err).Str("ip", ip).Msg("Error executing query in ClickHouse") + return nil, fmt.Errorf("error querying ClickHouse: %w", err) + } + defer rows.Close() + + if rows.Next() { + if err := rows.Scan( + &metadata.IP, + &metadata.Hostname, + &metadata.City, + &metadata.Region, + &metadata.Country, + &metadata.Latitude, + &metadata.Longitude, + &metadata.PostalCode, + &metadata.ASN, + &metadata.ASNOrganization, + &metadata.ASNType, + ); err != nil { + c.log.Error().Err(err).Str("ip", ip).Msg("Error scanning row") + return nil, fmt.Errorf("error scanning row: %w", err) + } + + c.log.Debug().Str("ip", ip).Msg("Successfully retrieved IP metadata from ClickHouse") + return &metadata, nil + } else { + c.log.Debug().Str("ip", ip).Msg("No metadata found in ClickHouse for IP") + return nil, nil + } +} + + +func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.Core, error) { + client := ipinfo.NewClient(nil, nil, c.ipInfoToken) + ipParsed := net.ParseIP(ip) + if ipParsed == nil { + return nil, fmt.Errorf("invalid IP address: %s", ip) + } + info, err := client.GetIPInfo(ipParsed) + if err != nil { + return nil, fmt.Errorf("IPInfo API error: %w", err) + } + return info, nil +} + +func convertIPInfoToMetadata(info *ipinfo.Core) *types.IPMetadataEvent { + var lat, long float64 + if info.Location != "" { + parts := strings.Split(info.Location, ",") + if len(parts) == 2 { + lat, _ = strconv.ParseFloat(parts[0], 64) + long, _ = strconv.ParseFloat(parts[1], 64) + } + } + + var asn, asnOrganization, asnType string + if info.ASN != nil { + asn = info.ASN.ASN + asnOrganization = info.ASN.Name + asnType = info.ASN.Type + } + + return &types.IPMetadataEvent{ + IP: info.IP.String(), + Hostname: info.Hostname, + City: info.City, + Region: info.Region, + Country: info.Country, + Latitude: lat, + Longitude: long, + PostalCode: info.Postal, + ASN: asn, + ASNOrganization: asnOrganization, + ASNType: asnType, + } +} + + +func (c *Consumer) cacheIPMetadata(ip string, metadata *types.IPMetadataEvent) { + c.ipCacheMu.Lock() + defer c.ipCacheMu.Unlock() + c.ipCache[ip] = metadata + go func() { + time.Sleep(c.ipCacheTTL) + c.ipCacheMu.Lock() + delete(c.ipCache, ip) + c.ipCacheMu.Unlock() + }() +} + +func (c *Consumer) ensureIPMetadataInClickHouse(metadata *types.IPMetadataEvent) error { + select { + case c.chClient.IPMetadataEventChan <- metadata: + return nil + default: + return fmt.Errorf("ClickHouse channel is full or unavailable") + } +} + + + func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { longLived := indexesFromBitfield(event.MetaData.Attnets) @@ -415,45 +610,36 @@ func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { } c.peerDiscoveredChan <- &types.PeerDiscoveredEvent{ - ENR: event.ENR, - ID: event.ID, - IP: ip, - Port: 0, // Adjust accordingly - CrawlerID: event.CrawlerID, + ENR: event.ENR, + ID: event.ID, + IP: ip, + Port: 0, // Adjust accordingly + CrawlerID: event.CrawlerID, CrawlerLoc: event.CrawlerLoc, - Timestamp: event.Timestamp, + Timestamp: event.Timestamp, } } func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) { - // Check if the event is properly populated if event.ID == "" || event.ENR == "" { c.log.Warn().Interface("event", event).Msg("Received incomplete discovery event") return } - // Attempt to write to the Parquet file if err := c.discoveryWriter.Write(event); err != nil { c.log.Err(err).Msg("Failed to write discovery event to Parquet file") } else { c.log.Trace().Msg("Wrote discovery event to Parquet file") } - // Send event to ClickHouse if the client is initialized - if c.chClient != nil { - // Ensure the channel is not closed - if c.chClient.PeerDiscoveredEventChan != nil { - c.chClient.PeerDiscoveredEventChan <- &event - c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel") - } else { - c.log.Error().Msg("Attempted to send event to a nil or closed channel") - } + if c.chClient != nil && c.chClient.PeerDiscoveredEventChan != nil { + c.chClient.PeerDiscoveredEventChan <- &event + c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel") } else { - c.log.Warn().Msg("ClickHouse client is nil; cannot send event") + c.log.Warn().Msg("ClickHouse client is nil or channel is closed; cannot send event") } } - func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { if err := c.metadataWriter.Write(event); err != nil { c.log.Err(err).Msg("Failed to write metadata event to Parquet file") @@ -461,7 +647,6 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { c.log.Trace().Msg("Wrote metadata event to Parquet file") } - // Send event to ClickHouse if c.chClient != nil { c.chClient.MetadataReceivedEventChan <- &event c.log.Info().Str("ID", event.ID).Msg("Inserted metadata received event into ClickHouse channel") @@ -471,22 +656,11 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { func (c *Consumer) processIPMetadataEvents() { for ipEvent := range c.ipMetadataChan { c.log.Info().Msgf("Received IP metadata event for processing: %s", ipEvent.IP) - // Write to Parquet file before sending to ClickHouse if err := c.ipMetadataWriter.Write(ipEvent); err != nil { c.log.Err(err).Msg("Failed to write IP metadata event to Parquet file") - continue // Proceed to the next event in case of failure + continue } c.log.Trace().Msg("Wrote IP metadata event to Parquet file") - - // Send event to ClickHouse - if c.chClient != nil { - if err := c.sendIPMetadataToClickHouse(ipEvent); err != nil { - c.log.Error().Err(err).Str("IP", ipEvent.IP).Msg("Failed to send IP metadata event to ClickHouse") - // Handle or retry as necessary - continue // You might choose to retry or simply log and move on - } - c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata event sent to ClickHouse successfully") - } } } @@ -497,4 +671,4 @@ func (c *Consumer) sendIPMetadataToClickHouse(ipEvent *types.IPMetadataEvent) er default: return fmt.Errorf("ClickHouse channel is full or unavailable") } -} +} \ No newline at end of file diff --git a/go.mod b/go.mod index 2a568b6..26584af 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect - github.com/herumi/bls-eth-go-binary v1.28.1 // indirect + github.com/herumi/bls-eth-go-binary v1.36.1 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect @@ -183,7 +183,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index a3765fe..5355033 100644 --- a/go.sum +++ b/go.sum @@ -631,8 +631,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/herumi/bls-eth-go-binary v1.28.1 h1:fcIZ48y5EE9973k05XjE8+P3YiQgjZz4JI/YabAm8KA= -github.com/herumi/bls-eth-go-binary v1.28.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U= +github.com/herumi/bls-eth-go-binary v1.36.1 h1:SfLjxbO1fWkKtKS7J3Ezd1/5QXrcaTZgWynxdSe10hQ= +github.com/herumi/bls-eth-go-binary v1.36.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= @@ -1482,8 +1482,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/ethereum/peerstore.go b/pkg/ethereum/peerstore.go index 8ee89ed..b0f7036 100644 --- a/pkg/ethereum/peerstore.go +++ b/pkg/ethereum/peerstore.go @@ -24,7 +24,7 @@ const ( Connecting ) -const EPOCH_DURATION = 12 * 32 * time.Second +const EPOCH_DURATION = 5 * 16 * time.Second //Gnosis // PeerInfo contains information about a peer type PeerInfo struct { @@ -57,7 +57,7 @@ func (p *PeerInfo) IntoMetadataEvent() *types.MetadataReceivedEvent { ClientVersion: p.clientVersion, MetaData: simpleMetadata, // `epoch = slot // SLOTS_PER_EPOCH` - Epoch: int(p.status.HeadSlot) / 32, + Epoch: int(p.status.HeadSlot) / 16, // These should be set later CrawlerID: "", CrawlerLoc: "",