From e63e49c56701243a70983a07cfcfcc018de70f0e Mon Sep 17 00:00:00 2001 From: hdser Date: Thu, 26 Sep 2024 10:57:44 +0000 Subject: [PATCH 1/4] fix --- clickhouse/client.go | 93 ++++++----- consumer/consumer.go | 366 +++++++++++++++++++++++++++++-------------- 2 files changed, 303 insertions(+), 156 deletions(-) diff --git a/clickhouse/client.go b/clickhouse/client.go index 7423409..8344444 100644 --- a/clickhouse/client.go +++ b/clickhouse/client.go @@ -3,6 +3,7 @@ package clickhouse import ( "context" "crypto/tls" + "database/sql" "fmt" "time" @@ -267,50 +268,68 @@ type ClickhouseConfig struct { } type ClickhouseClient struct { - cfg *ClickhouseConfig - log zerolog.Logger - - chConn driver.Conn - - ValidatorEventChan chan *types.ValidatorEvent - IPMetadataEventChan chan *types.IPMetadataEvent - PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent - MetadataReceivedEventChan chan *types.MetadataReceivedEvent + cfg *ClickhouseConfig + log zerolog.Logger + chConn clickhouse.Conn + sqlDB *sql.DB + + ValidatorEventChan chan *types.ValidatorEvent + IPMetadataEventChan chan *types.IPMetadataEvent + PeerDiscoveredEventChan chan *types.PeerDiscoveredEvent + MetadataReceivedEventChan chan *types.MetadataReceivedEvent } func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) { - log := log.NewLogger("clickhouse") - - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{cfg.Endpoint}, - DialTimeout: time.Second * 60, - Auth: clickhouse.Auth{ - Database: cfg.DB, - Username: cfg.Username, - Password: cfg.Password, - }, - Debugf: func(format string, v ...interface{}) { - log.Debug().Str("module", "clickhouse").Msgf(format, v) - }, - Protocol: clickhouse.Native, - TLS: &tls.Config{}, - }) + log := log.NewLogger("clickhouse") + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{cfg.Endpoint}, + DialTimeout: time.Second * 60, + Auth: clickhouse.Auth{ + Database: cfg.DB, + Username: cfg.Username, + Password: cfg.Password, + }, + Debugf: func(format string, v ...interface{}) { + log.Debug().Str("module", "clickhouse").Msgf(format, v) + }, + Protocol: clickhouse.Native, + TLS: &tls.Config{}, + }) - if err != nil { - return nil, err - } + if err != nil { + return nil, err + } + + // Create a standard sql.DB connection + sqlDB, err := sql.Open("clickhouse", cfg.Endpoint) + if err != nil { + return nil, err + } - return &ClickhouseClient{ - cfg: cfg, - log: log, - chConn: conn, + return &ClickhouseClient{ + cfg: cfg, + log: log, + chConn: conn, + sqlDB: sqlDB, + + ValidatorEventChan: make(chan *types.ValidatorEvent, 16384), + IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384), + PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384), + MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384), + }, nil +} - ValidatorEventChan: make(chan *types.ValidatorEvent, 16384), - IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384), - PeerDiscoveredEventChan: make(chan *types.PeerDiscoveredEvent, 16384), - MetadataReceivedEventChan: make(chan *types.MetadataReceivedEvent, 16384), - }, nil +func (c *ClickhouseClient) QueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row { + return c.sqlDB.QueryRowContext(ctx, query, args...) +} + +func (c *ClickhouseClient) Close() error { + if err := c.chConn.Close(); err != nil { + return err + } + return c.sqlDB.Close() } func (c *ClickhouseClient) initializeTables() error { diff --git a/consumer/consumer.go b/consumer/consumer.go index 6bf295e..0b90804 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -11,9 +11,11 @@ import ( "os/signal" "syscall" "time" - //"sync" - ma "github.com/multiformats/go-multiaddr" + "sync" + "net" + "strconv" + ma "github.com/multiformats/go-multiaddr" ch "github.com/chainbound/valtrack/clickhouse" "github.com/chainbound/valtrack/log" "github.com/chainbound/valtrack/types" @@ -23,6 +25,7 @@ import ( "github.com/rs/zerolog" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/writer" + "github.com/ipinfo/go/v2/ipinfo" ) const basePath = "/data" @@ -48,97 +51,117 @@ type Consumer struct { peerDiscoveredChan chan *types.PeerDiscoveredEvent metadataReceivedChan chan *types.MetadataReceivedEvent validatorMetadataChan chan *types.MetadataReceivedEvent - ipMetadataChan chan *types.IPMetadataEvent + ipMetadataChan chan *types.IPMetadataEvent chClient *ch.ClickhouseClient db *sql.DB dune *Dune -} - -func RunConsumer(cfg *ConsumerConfig) { - log := log.NewLogger("consumer") - - dbPath := fmt.Sprintf("%s/validator_tracker.sqlite", basePath) - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - log.Error().Err(err).Msg("Error opening database") - } - defer db.Close() - - err = setupDatabase(db) - if err != nil { - log.Error().Err(err).Msg("Error setting up database") - } - - // err = loadIPMetadataFromCSV(db, "ip_metadata.csv") -// if err != nil { -// log.Error().Err(err).Msg("Error setting up database") -// } - - log.Info().Msg("Sqlite DB setup complete") - - nc, err := nats.Connect(cfg.NatsURL) - if err != nil { - log.Error().Err(err).Msg("Error connecting to NATS") - } - defer nc.Close() - js, err := jetstream.New(nc) - if err != nil { - log.Error().Err(err).Msg("Error creating JetStream context") - } + ipCache map[string]*types.IPMetadataEvent + ipCacheMu sync.RWMutex + ipCacheTTL time.Duration + ipInfoToken string +} +func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream, chClient *ch.ClickhouseClient, db *sql.DB, dune *Dune) (*Consumer, error) { discoveryFilePath := fmt.Sprintf("%s/discovery_events.parquet", basePath) w_discovery, err := local.NewLocalFileWriter(discoveryFilePath) if err != nil { - log.Error().Err(err).Msg("Error creating discovery events parquet file") + return nil, fmt.Errorf("error creating discovery events parquet file: %w", err) } - defer w_discovery.Close() metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath) w_metadata, err := local.NewLocalFileWriter(metadataFilePath) if err != nil { - log.Error().Err(err).Msg("Error creating metadata events parquet file") + return nil, fmt.Errorf("error creating metadata events parquet file: %w", err) } - defer w_metadata.Close() validatorFilePath := fmt.Sprintf("%s/validator_metadata_events.parquet", basePath) w_validator, err := local.NewLocalFileWriter(validatorFilePath) if err != nil { - log.Error().Err(err).Msg("Error creating validator parquet file") + return nil, fmt.Errorf("error creating validator parquet file: %w", err) } - defer w_validator.Close() ipMetadataFilePath := fmt.Sprintf("%s/ip_metadata_events.parquet", basePath) w_ipMetadata, err := local.NewLocalFileWriter(ipMetadataFilePath) if err != nil { - log.Error().Err(err).Msg("Error creating IP metadata events parquet file") + return nil, fmt.Errorf("error creating IP metadata events parquet file: %w", err) } - defer w_ipMetadata.Close() discoveryWriter, err := writer.NewParquetWriter(w_discovery, new(types.PeerDiscoveredEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating Peer discovered Parquet writer") + return nil, fmt.Errorf("error creating Peer discovered Parquet writer: %w", err) } - defer discoveryWriter.WriteStop() metadataWriter, err := writer.NewParquetWriter(w_metadata, new(types.MetadataReceivedEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating Metadata Parquet writer") + return nil, fmt.Errorf("error creating Metadata Parquet writer: %w", err) } - defer metadataWriter.WriteStop() validatorWriter, err := writer.NewParquetWriter(w_validator, new(types.ValidatorEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating Validator Parquet writer") + return nil, fmt.Errorf("error creating Validator Parquet writer: %w", err) } - defer validatorWriter.WriteStop() ipMetadataWriter, err := writer.NewParquetWriter(w_ipMetadata, new(types.IPMetadataEvent), 4) if err != nil { - log.Error().Err(err).Msg("Error creating IP Metadata Parquet writer") + return nil, fmt.Errorf("error creating IP Metadata Parquet writer: %w", err) + } + + return &Consumer{ + log: log, + discoveryWriter: discoveryWriter, + metadataWriter: metadataWriter, + validatorWriter: validatorWriter, + ipMetadataWriter: ipMetadataWriter, + js: js, + + peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), + metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), + validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), + ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), + + chClient: chClient, + db: db, + dune: dune, + + ipCache: make(map[string]*types.IPMetadataEvent), + ipCacheTTL: 1 * time.Hour, + ipInfoToken: os.Getenv("IPINFO_TOKEN"), + }, nil +} + +func RunConsumer(cfg *ConsumerConfig) { + log := log.NewLogger("consumer") + + dbPath := fmt.Sprintf("%s/validator_tracker.sqlite", basePath) + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + log.Error().Err(err).Msg("Error opening database") + return + } + defer db.Close() + + err = setupDatabase(db) + if err != nil { + log.Error().Err(err).Msg("Error setting up database") + return + } + + log.Info().Msg("Sqlite DB setup complete") + + nc, err := nats.Connect(cfg.NatsURL) + if err != nil { + log.Error().Err(err).Msg("Error connecting to NATS") + return + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + log.Error().Err(err).Msg("Error creating JetStream context") + return } - defer ipMetadataWriter.WriteStop() chCfg := ch.ClickhouseConfig{ Endpoint: cfg.ChCfg.Endpoint, @@ -147,8 +170,8 @@ func RunConsumer(cfg *ConsumerConfig) { Password: cfg.ChCfg.Password, MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize, - MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, - MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize, + MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, + MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize, } var chClient *ch.ClickhouseClient @@ -156,11 +179,14 @@ func RunConsumer(cfg *ConsumerConfig) { chClient, err = ch.NewClickhouseClient(&chCfg) if err != nil { log.Error().Err(err).Msg("Error creating Clickhouse client") + return } + defer chClient.Close() // Add this line to ensure the client is closed err = chClient.Start() if err != nil { log.Error().Err(err).Msg("Error starting Clickhouse client") + return } } @@ -169,22 +195,10 @@ func RunConsumer(cfg *ConsumerConfig) { dune = NewDune(cfg.DuneNamespace, cfg.DuneApiKey) } - consumer := Consumer{ - log: log, - discoveryWriter: discoveryWriter, - metadataWriter: metadataWriter, - validatorWriter: validatorWriter, - ipMetadataWriter: ipMetadataWriter, - js: js, - - peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), - metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), - validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), - ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), - - chClient: chClient, - db: db, - dune: dune, + consumer, err := NewConsumer(cfg, log, js, chClient, db, dune) + if err != nil { + log.Error().Err(err).Msg("Error creating consumer") + return } go func() { @@ -194,15 +208,17 @@ func RunConsumer(cfg *ConsumerConfig) { }() ipInfoToken := os.Getenv("IPINFO_TOKEN") - if ipInfoToken == "" { - log.Error().Msg("IPINFO_TOKEN environment variable is required") - } - - go consumer.runValidatorMetadataEventHandler(ipInfoToken) - - go consumer.processIPMetadataEvents() + if ipInfoToken == "" { + log.Error().Msg("IPINFO_TOKEN environment variable is required") + return + } - + go func() { + if err := consumer.runValidatorMetadataEventHandler(ipInfoToken); err != nil { + log.Error().Err(err).Msg("Error in validator metadata handler") + } + }() + go consumer.processIPMetadataEvents() server := &http.Server{Addr: ":8080", Handler: nil} http.HandleFunc("/validators", createGetValidatorsHandler(db)) @@ -212,7 +228,6 @@ func RunConsumer(cfg *ConsumerConfig) { log.Error().Err(err).Msg("Error starting HTTP server") } }() - defer server.Shutdown(context.Background()) if dune != nil { log.Info().Msg("Starting to publish to Dune") @@ -235,20 +250,35 @@ func RunConsumer(cfg *ConsumerConfig) { quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) - - <-quit + + // Shutdown process + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + select { + case <-quit: + log.Info().Msg("Shutdown signal received") + case <-ctx.Done(): + log.Info().Msg("Shutdown timeout") + } + close(consumer.ipMetadataChan) + log.Info().Msg("Shutting down consumer") + + if err := server.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("Error shutting down HTTP server") + } - //wg := &sync.WaitGroup{} - //wg.Add(1) - // go func() { - // defer wg.Done() - // consumer.processIPMetadataEvents() - // }() + // If you have a Clickhouse client, close it here + if chClient != nil { + if err := chClient.Close(); err != nil { + log.Error().Err(err).Msg("Error closing Clickhouse client") + } + } - // <-quit - // close(consumer.ipMetadataChan) // Signal goroutine to finish - // wg.Wait() // Wait for the goroutine to clean up + // Any other cleanup can go here + + log.Info().Msg("Consumer shutdown complete") } func (c *Consumer) Start(name string) error { @@ -291,10 +321,8 @@ func (c *Consumer) Start(name string) error { return } - //c.log.Debug().Int("batch_size", len(batch.Messages())).Msg("Processing batch of messages") - for msg := range batch.Messages() { - handleMessage(c, msg) + c.handleMessage(msg) c.log.Debug().Str("msg_subject", msg.Subject()).Msg("Message handled") } } @@ -303,7 +331,7 @@ func (c *Consumer) Start(name string) error { return nil } -func handleMessage(c *Consumer, msg jetstream.Msg) { +func (c *Consumer) handleMessage(msg jetstream.Msg) { md, _ := msg.Metadata() progress := float64(md.Sequence.Stream) / (float64(md.NumPending) + float64(md.Sequence.Stream)) * 100 @@ -332,6 +360,18 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { } c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered") + + // Fetch IP metadata synchronously + ipMetadata, err := c.getIPMetadata(event.IP) + if err != nil { + c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to fetch IP metadata") + } else { + // Insert IP metadata into ClickHouse + if err := c.ensureIPMetadataInClickHouse(ipMetadata); err != nil { + c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to ensure IP metadata in ClickHouse") + } + } + c.storeDiscoveryEvent(event) // Send to ClickHouse if client is initialized @@ -365,6 +405,107 @@ func handleMessage(c *Consumer, msg jetstream.Msg) { } } +func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { + // Check cache first + c.ipCacheMu.RLock() + if metadata, found := c.ipCache[ip]; found { + c.ipCacheMu.RUnlock() + return metadata, nil + } + c.ipCacheMu.RUnlock() + + // Check ClickHouse + metadata, err := c.getIPMetadataFromClickHouse(ip) + if err == nil { + // Found in ClickHouse, cache and return + c.cacheIPMetadata(ip, metadata) + return metadata, nil + } + + // Not found in ClickHouse, fetch from IPInfo API + ipInfo, err := c.fetchIPInfoFromAPI(ip) + if err != nil { + return nil, fmt.Errorf("failed to fetch IP info: %w", err) + } + + metadata = convertIPInfoToMetadata(ipInfo) + c.cacheIPMetadata(ip, metadata) + + return metadata, nil +} + +func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEvent, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var metadata types.IPMetadataEvent + query := fmt.Sprintf("SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type FROM ip_metadata WHERE ip = '%s'", ip) + + err := c.chClient.QueryRow(ctx, query).Scan( + &metadata.IP, &metadata.Hostname, &metadata.City, &metadata.Region, + &metadata.Country, &metadata.Latitude, &metadata.Longitude, + &metadata.PostalCode, &metadata.ASN, &metadata.ASNOrganization, &metadata.ASNType, + ) + + if err != nil { + if err == sql.ErrNoRows { + return nil, nil // No metadata found for this IP + } + return nil, fmt.Errorf("error querying ClickHouse: %w", err) + } + + return &metadata, nil +} + +func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.IPInfo, error) { + client := ipinfo.NewClient(nil, ipinfo.NewCache(nil), c.ipInfoToken) + info, err := client.GetIPInfo(net.ParseIP(ip)) + if err != nil { + return nil, fmt.Errorf("IPInfo API error: %w", err) + } + return info, nil +} + +func convertIPInfoToMetadata(info *ipinfo.IPInfo) *types.IPMetadataEvent { + lat, _ := strconv.ParseFloat(info.Latitude, 64) + lon, _ := strconv.ParseFloat(info.Longitude, 64) + + return &types.IPMetadataEvent{ + IP: info.IP.String(), + Hostname: info.Hostname, + City: info.City, + Region: info.Region, + Country: info.CountryName, + Latitude: lat, + Longitude: lon, + PostalCode: info.Postal, + ASN: info.ASN, + ASNOrganization: info.Org, + ASNType: "", // IPInfo might not provide ASN type directly + } +} + +func (c *Consumer) cacheIPMetadata(ip string, metadata *types.IPMetadataEvent) { + c.ipCacheMu.Lock() + defer c.ipCacheMu.Unlock() + c.ipCache[ip] = metadata + go func() { + time.Sleep(c.ipCacheTTL) + c.ipCacheMu.Lock() + delete(c.ipCache, ip) + c.ipCacheMu.Unlock() + }() +} + +func (c *Consumer) ensureIPMetadataInClickHouse(metadata *types.IPMetadataEvent) error { + select { + case c.chClient.IPMetadataEventChan <- metadata: + return nil + default: + return fmt.Errorf("ClickHouse channel is full or unavailable") + } +} + func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { longLived := indexesFromBitfield(event.MetaData.Attnets) @@ -415,45 +556,36 @@ func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { } c.peerDiscoveredChan <- &types.PeerDiscoveredEvent{ - ENR: event.ENR, - ID: event.ID, - IP: ip, - Port: 0, // Adjust accordingly - CrawlerID: event.CrawlerID, + ENR: event.ENR, + ID: event.ID, + IP: ip, + Port: 0, // Adjust accordingly + CrawlerID: event.CrawlerID, CrawlerLoc: event.CrawlerLoc, - Timestamp: event.Timestamp, + Timestamp: event.Timestamp, } } func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) { - // Check if the event is properly populated if event.ID == "" || event.ENR == "" { c.log.Warn().Interface("event", event).Msg("Received incomplete discovery event") return } - // Attempt to write to the Parquet file if err := c.discoveryWriter.Write(event); err != nil { c.log.Err(err).Msg("Failed to write discovery event to Parquet file") } else { c.log.Trace().Msg("Wrote discovery event to Parquet file") } - // Send event to ClickHouse if the client is initialized - if c.chClient != nil { - // Ensure the channel is not closed - if c.chClient.PeerDiscoveredEventChan != nil { - c.chClient.PeerDiscoveredEventChan <- &event - c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel") - } else { - c.log.Error().Msg("Attempted to send event to a nil or closed channel") - } + if c.chClient != nil && c.chClient.PeerDiscoveredEventChan != nil { + c.chClient.PeerDiscoveredEventChan <- &event + c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel") } else { - c.log.Warn().Msg("ClickHouse client is nil; cannot send event") + c.log.Warn().Msg("ClickHouse client is nil or channel is closed; cannot send event") } } - func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { if err := c.metadataWriter.Write(event); err != nil { c.log.Err(err).Msg("Failed to write metadata event to Parquet file") @@ -461,7 +593,6 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { c.log.Trace().Msg("Wrote metadata event to Parquet file") } - // Send event to ClickHouse if c.chClient != nil { c.chClient.MetadataReceivedEventChan <- &event c.log.Info().Str("ID", event.ID).Msg("Inserted metadata received event into ClickHouse channel") @@ -471,19 +602,16 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { func (c *Consumer) processIPMetadataEvents() { for ipEvent := range c.ipMetadataChan { c.log.Info().Msgf("Received IP metadata event for processing: %s", ipEvent.IP) - // Write to Parquet file before sending to ClickHouse if err := c.ipMetadataWriter.Write(ipEvent); err != nil { c.log.Err(err).Msg("Failed to write IP metadata event to Parquet file") - continue // Proceed to the next event in case of failure + continue } c.log.Trace().Msg("Wrote IP metadata event to Parquet file") - // Send event to ClickHouse if c.chClient != nil { if err := c.sendIPMetadataToClickHouse(ipEvent); err != nil { c.log.Error().Err(err).Str("IP", ipEvent.IP).Msg("Failed to send IP metadata event to ClickHouse") - // Handle or retry as necessary - continue // You might choose to retry or simply log and move on + continue } c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata event sent to ClickHouse successfully") } From 656336bb33a6fe7664403cf389ad3f6785659826 Mon Sep 17 00:00:00 2001 From: hdser Date: Mon, 30 Sep 2024 14:45:15 +0200 Subject: [PATCH 2/4] fix ipinfo --- Dockerfile | 33 ++++------- consumer/consumer.go | 127 +++++++++++++++++++++++++------------------ go.mod | 4 +- go.sum | 8 +-- 4 files changed, 92 insertions(+), 80 deletions(-) diff --git a/Dockerfile b/Dockerfile index b1f63da..eb37595 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,42 +1,31 @@ -ARG GO_VERSION=1.22.5 +FROM golang:1.22.5-bookworm as builder -# Stage 1: Dependency management and build -FROM golang:${GO_VERSION}-bookworm as builder +# Install CA certificates and build essentials +RUN apt-get update && apt-get install -y ca-certificates build-essential && rm -rf /var/lib/apt/lists/* +# Set the working directory inside the container to /app WORKDIR /app -# Copy go.mod and go.sum files +# Copy the Go module files COPY go.mod go.sum ./ -# Download dependencies and verify modules -RUN go mod download && go mod verify +# Download dependencies +RUN go mod download # Copy the rest of the application source code COPY . . -# Run go mod tidy to ensure the go.mod file is up to date -RUN go mod tidy +# Build the application; output the binary to a known location +RUN go build -v -o /run-app . -# Build the application and capture the output -RUN go build -v -o /run-app . - -# Stage 2: Final stage +# Final stage based on Debian Bookworm-slim FROM debian:bookworm-slim -# Install CA certificates in the final image +# Install CA certificates in the final image to ensure they are present. RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* # Copy the built executable from the builder stage COPY --from=builder /run-app /usr/local/bin/run-app -# Create necessary directory -RUN mkdir -p /app/data - -# Copy the CSV file to /app/data -COPY /data/ip_metadata.csv /app/data/ip_metadata.csv - -# Set the working directory -WORKDIR /app - # Set the command to run the application CMD ["/usr/local/bin/run-app"] \ No newline at end of file diff --git a/consumer/consumer.go b/consumer/consumer.go index 0b90804..6265e62 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -6,26 +6,27 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net" "net/http" "os" "os/signal" + "strconv" + "strings" + "sync" "syscall" "time" - "sync" - "net" - "strconv" - ma "github.com/multiformats/go-multiaddr" ch "github.com/chainbound/valtrack/clickhouse" "github.com/chainbound/valtrack/log" "github.com/chainbound/valtrack/types" + "github.com/ipinfo/go/v2/ipinfo" + ma "github.com/multiformats/go-multiaddr" _ "github.com/mattn/go-sqlite3" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/rs/zerolog" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/writer" - "github.com/ipinfo/go/v2/ipinfo" ) const basePath = "/data" @@ -48,10 +49,10 @@ type Consumer struct { ipMetadataWriter *writer.ParquetWriter js jetstream.JetStream - peerDiscoveredChan chan *types.PeerDiscoveredEvent - metadataReceivedChan chan *types.MetadataReceivedEvent - validatorMetadataChan chan *types.MetadataReceivedEvent - ipMetadataChan chan *types.IPMetadataEvent + peerDiscoveredChan chan *types.PeerDiscoveredEvent + metadataReceivedChan chan *types.MetadataReceivedEvent + validatorMetadataChan chan *types.MetadataReceivedEvent + ipMetadataChan chan *types.IPMetadataEvent chClient *ch.ClickhouseClient db *sql.DB @@ -71,7 +72,7 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream } metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath) - w_metadata, err := local.NewLocalFileWriter(metadataFilePath) + w_metadata, err := local.NewLocalFileWriter(metadataFilePath) if err != nil { return nil, fmt.Errorf("error creating metadata events parquet file: %w", err) } @@ -116,17 +117,17 @@ func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream ipMetadataWriter: ipMetadataWriter, js: js, - peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), - metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), - validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), - ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), + peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), + metadataReceivedChan: make(chan *types.MetadataReceivedEvent, 16384), + validatorMetadataChan: make(chan *types.MetadataReceivedEvent, 16384), + ipMetadataChan: make(chan *types.IPMetadataEvent, 16384), chClient: chClient, db: db, dune: dune, - ipCache: make(map[string]*types.IPMetadataEvent), - ipCacheTTL: 1 * time.Hour, + ipCache: make(map[string]*types.IPMetadataEvent), + ipCacheTTL: 1 * time.Hour, ipInfoToken: os.Getenv("IPINFO_TOKEN"), }, nil } @@ -164,13 +165,13 @@ func RunConsumer(cfg *ConsumerConfig) { } chCfg := ch.ClickhouseConfig{ - Endpoint: cfg.ChCfg.Endpoint, - DB: cfg.ChCfg.DB, - Username: cfg.ChCfg.Username, - Password: cfg.ChCfg.Password, - MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, - MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize, - MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, + Endpoint: cfg.ChCfg.Endpoint, + DB: cfg.ChCfg.DB, + Username: cfg.ChCfg.Username, + Password: cfg.ChCfg.Password, + MaxValidatorBatchSize: cfg.ChCfg.MaxValidatorBatchSize, + MaxIPMetadataBatchSize: cfg.ChCfg.MaxIPMetadataBatchSize, + MaxPeerDiscoveredEventsBatchSize: cfg.ChCfg.MaxPeerDiscoveredEventsBatchSize, MaxMetadataReceivedEventsBatchSize: cfg.ChCfg.MaxMetadataReceivedEventsBatchSize, } @@ -181,7 +182,7 @@ func RunConsumer(cfg *ConsumerConfig) { log.Error().Err(err).Msg("Error creating Clickhouse client") return } - defer chClient.Close() // Add this line to ensure the client is closed + defer chClient.Close() err = chClient.Start() if err != nil { @@ -269,15 +270,12 @@ func RunConsumer(cfg *ConsumerConfig) { log.Error().Err(err).Msg("Error shutting down HTTP server") } - // If you have a Clickhouse client, close it here if chClient != nil { if err := chClient.Close(); err != nil { log.Error().Err(err).Msg("Error closing Clickhouse client") } } - // Any other cleanup can go here - log.Info().Msg("Consumer shutdown complete") } @@ -304,9 +302,10 @@ func (c *Consumer) Start(name string) error { return err } - // Load IP metadata from CSV - if err := c.chClient.LoadIPMetadataFromCSV(); err != nil { - c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV") + if c.chClient != nil { + if err := c.chClient.LoadIPMetadataFromCSV(); err != nil { + c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV") + } } go func() { @@ -346,7 +345,6 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) { c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata received") c.ipMetadataChan <- &ipEvent - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.IPMetadataEventChan <- &ipEvent } @@ -360,13 +358,11 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) { } c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered") - - // Fetch IP metadata synchronously + ipMetadata, err := c.getIPMetadata(event.IP) if err != nil { c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to fetch IP metadata") } else { - // Insert IP metadata into ClickHouse if err := c.ensureIPMetadataInClickHouse(ipMetadata); err != nil { c.log.Error().Err(err).Str("ip", event.IP).Msg("Failed to ensure IP metadata in ClickHouse") } @@ -374,7 +370,6 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) { c.storeDiscoveryEvent(event) - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.PeerDiscoveredEventChan <- &event } @@ -391,7 +386,6 @@ func (c *Consumer) handleMessage(msg jetstream.Msg) { c.handleMetadataEvent(event) c.storeMetadataEvent(event) - // Send to ClickHouse if client is initialized if c.chClient != nil { c.chClient.MetadataReceivedEventChan <- &event } @@ -416,7 +410,10 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { // Check ClickHouse metadata, err := c.getIPMetadataFromClickHouse(ip) - if err == nil { + if err != nil { + return nil, fmt.Errorf("error querying ClickHouse: %w", err) + } + if metadata != nil { // Found in ClickHouse, cache and return c.cacheIPMetadata(ip, metadata) return metadata, nil @@ -429,6 +426,12 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { } metadata = convertIPInfoToMetadata(ipInfo) + + // Store metadata into ClickHouse + if err := c.ensureIPMetadataInClickHouse(metadata); err != nil { + return nil, fmt.Errorf("failed to ensure IP metadata in ClickHouse: %w", err) + } + c.cacheIPMetadata(ip, metadata) return metadata, nil @@ -440,13 +443,13 @@ func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEven var metadata types.IPMetadataEvent query := fmt.Sprintf("SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type FROM ip_metadata WHERE ip = '%s'", ip) - + err := c.chClient.QueryRow(ctx, query).Scan( &metadata.IP, &metadata.Hostname, &metadata.City, &metadata.Region, &metadata.Country, &metadata.Latitude, &metadata.Longitude, &metadata.PostalCode, &metadata.ASN, &metadata.ASNOrganization, &metadata.ASNType, ) - + if err != nil { if err == sql.ErrNoRows { return nil, nil // No metadata found for this IP @@ -457,34 +460,52 @@ func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEven return &metadata, nil } -func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.IPInfo, error) { - client := ipinfo.NewClient(nil, ipinfo.NewCache(nil), c.ipInfoToken) - info, err := client.GetIPInfo(net.ParseIP(ip)) +func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.Core, error) { + client := ipinfo.NewClient(nil, nil, c.ipInfoToken) + ipParsed := net.ParseIP(ip) + if ipParsed == nil { + return nil, fmt.Errorf("invalid IP address: %s", ip) + } + info, err := client.GetIPInfo(ipParsed) if err != nil { return nil, fmt.Errorf("IPInfo API error: %w", err) } return info, nil } -func convertIPInfoToMetadata(info *ipinfo.IPInfo) *types.IPMetadataEvent { - lat, _ := strconv.ParseFloat(info.Latitude, 64) - lon, _ := strconv.ParseFloat(info.Longitude, 64) - +func convertIPInfoToMetadata(info *ipinfo.Core) *types.IPMetadataEvent { + var lat, long float64 + if info.Location != "" { + parts := strings.Split(info.Location, ",") + if len(parts) == 2 { + lat, _ = strconv.ParseFloat(parts[0], 64) + long, _ = strconv.ParseFloat(parts[1], 64) + } + } + + var asn, asnOrganization, asnType string + if info.ASN != nil { + asn = info.ASN.ASN + asnOrganization = info.ASN.Name + asnType = info.ASN.Type + } + return &types.IPMetadataEvent{ IP: info.IP.String(), Hostname: info.Hostname, City: info.City, Region: info.Region, - Country: info.CountryName, + Country: info.Country, Latitude: lat, - Longitude: lon, + Longitude: long, PostalCode: info.Postal, - ASN: info.ASN, - ASNOrganization: info.Org, - ASNType: "", // IPInfo might not provide ASN type directly + ASN: asn, + ASNOrganization: asnOrganization, + ASNType: asnType, } } + func (c *Consumer) cacheIPMetadata(ip string, metadata *types.IPMetadataEvent) { c.ipCacheMu.Lock() defer c.ipCacheMu.Unlock() @@ -506,6 +527,8 @@ func (c *Consumer) ensureIPMetadataInClickHouse(metadata *types.IPMetadataEvent) } } + + func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { longLived := indexesFromBitfield(event.MetaData.Attnets) @@ -625,4 +648,4 @@ func (c *Consumer) sendIPMetadataToClickHouse(ipEvent *types.IPMetadataEvent) er default: return fmt.Errorf("ClickHouse channel is full or unavailable") } -} +} \ No newline at end of file diff --git a/go.mod b/go.mod index 2a568b6..26584af 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect - github.com/herumi/bls-eth-go-binary v1.28.1 // indirect + github.com/herumi/bls-eth-go-binary v1.36.1 // indirect github.com/holiman/uint256 v1.2.4 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect @@ -183,7 +183,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index a3765fe..5355033 100644 --- a/go.sum +++ b/go.sum @@ -631,8 +631,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/herumi/bls-eth-go-binary v1.28.1 h1:fcIZ48y5EE9973k05XjE8+P3YiQgjZz4JI/YabAm8KA= -github.com/herumi/bls-eth-go-binary v1.28.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U= +github.com/herumi/bls-eth-go-binary v1.36.1 h1:SfLjxbO1fWkKtKS7J3Ezd1/5QXrcaTZgWynxdSe10hQ= +github.com/herumi/bls-eth-go-binary v1.36.1/go.mod h1:luAnRm3OsMQeokhGzpYmc0ZKwawY7o87PUEP11Z7r7U= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= @@ -1482,8 +1482,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From d584ae0323ab744a9264a6dfe538cfefcec2219d Mon Sep 17 00:00:00 2001 From: hdser Date: Mon, 30 Sep 2024 15:00:36 +0200 Subject: [PATCH 3/4] fix Gnosis Chain parameters --- pkg/ethereum/peerstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ethereum/peerstore.go b/pkg/ethereum/peerstore.go index 8ee89ed..b0f7036 100644 --- a/pkg/ethereum/peerstore.go +++ b/pkg/ethereum/peerstore.go @@ -24,7 +24,7 @@ const ( Connecting ) -const EPOCH_DURATION = 12 * 32 * time.Second +const EPOCH_DURATION = 5 * 16 * time.Second //Gnosis // PeerInfo contains information about a peer type PeerInfo struct { @@ -57,7 +57,7 @@ func (p *PeerInfo) IntoMetadataEvent() *types.MetadataReceivedEvent { ClientVersion: p.clientVersion, MetaData: simpleMetadata, // `epoch = slot // SLOTS_PER_EPOCH` - Epoch: int(p.status.HeadSlot) / 32, + Epoch: int(p.status.HeadSlot) / 16, // These should be set later CrawlerID: "", CrawlerLoc: "", From 00d9e73cad587ce1d9970879b77afa9dd19d62d7 Mon Sep 17 00:00:00 2001 From: hdser Date: Tue, 1 Oct 2024 10:24:49 +0000 Subject: [PATCH 4/4] fix ip data --- Dockerfile | 6 ++++ clickhouse/client.go | 39 ++++++++--------------- consumer/consumer.go | 73 +++++++++++++++++++++++++++++--------------- 3 files changed, 66 insertions(+), 52 deletions(-) diff --git a/Dockerfile b/Dockerfile index eb37595..69e07c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,5 +27,11 @@ RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/ # Copy the built executable from the builder stage COPY --from=builder /run-app /usr/local/bin/run-app +# Create a directory for the data +RUN mkdir -p /app/data + +# Copy the CSV file +COPY data/ip_metadata.csv /app/data/ip_metadata.csv + # Set the command to run the application CMD ["/usr/local/bin/run-app"] \ No newline at end of file diff --git a/clickhouse/client.go b/clickhouse/client.go index 8344444..4e30c4c 100644 --- a/clickhouse/client.go +++ b/clickhouse/client.go @@ -3,7 +3,6 @@ package clickhouse import ( "context" "crypto/tls" - "database/sql" "fmt" "time" @@ -48,7 +47,7 @@ func (c *ClickhouseClient) LoadIPMetadataFromCSV() error { reader := csv.NewReader(file) reader.FieldsPerRecord = -1 // Allow variable number of fields - batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata") + batch, err := c.ChConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata") if err != nil { return fmt.Errorf("failed to prepare batch: %w", err) } @@ -177,7 +176,7 @@ func parseFloat(s string) (float64, error) { func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) { query := fmt.Sprintf("SELECT count(*) FROM %s", tableName) var count uint64 - err := c.chConn.QueryRow(context.Background(), query).Scan(&count) + err := c.ChConn.QueryRow(context.Background(), query).Scan(&count) if err != nil { return false, err } @@ -218,7 +217,7 @@ func IPMetadataDDL(db string) string { asn String, asn_organization String, asn_type String - ) ENGINE = MergeTree() + ) ENGINE = ReplacingMergeTree() ORDER BY ip; `, db) } @@ -270,8 +269,7 @@ type ClickhouseConfig struct { type ClickhouseClient struct { cfg *ClickhouseConfig log zerolog.Logger - chConn clickhouse.Conn - sqlDB *sql.DB + ChConn clickhouse.Conn ValidatorEventChan chan *types.ValidatorEvent IPMetadataEventChan chan *types.IPMetadataEvent @@ -302,17 +300,10 @@ func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) { return nil, err } - // Create a standard sql.DB connection - sqlDB, err := sql.Open("clickhouse", cfg.Endpoint) - if err != nil { - return nil, err - } - return &ClickhouseClient{ cfg: cfg, log: log, - chConn: conn, - sqlDB: sqlDB, + ChConn: conn, ValidatorEventChan: make(chan *types.ValidatorEvent, 16384), IPMetadataEventChan: make(chan *types.IPMetadataEvent, 16384), @@ -321,39 +312,33 @@ func NewClickhouseClient(cfg *ClickhouseConfig) (*ClickhouseClient, error) { }, nil } -func (c *ClickhouseClient) QueryRow(ctx context.Context, query string, args ...interface{}) *sql.Row { - return c.sqlDB.QueryRowContext(ctx, query, args...) -} func (c *ClickhouseClient) Close() error { - if err := c.chConn.Close(); err != nil { - return err - } - return c.sqlDB.Close() + return c.ChConn.Close() } func (c *ClickhouseClient) initializeTables() error { // Create validator_metadata table - if err := c.chConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), ValidatorMetadataDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating validator_metadata table") return err } // Create ip_metadata table - if err := c.chConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), IPMetadataDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating ip_metadata table") return err } // Create peer_discovered_events table - if err := c.chConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), PeerDiscoveredEventsDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating peer_discovered_events table") return err } // Create metadata_received_events table - if err := c.chConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil { + if err := c.ChConn.Exec(context.Background(), MetadataReceivedEventsDDL(c.cfg.DB)); err != nil { c.log.Error().Err(err).Msg("creating metadata_received_events table") return err } @@ -387,7 +372,7 @@ func (c *ClickhouseClient) Start() error { // BatchProcessor processes events in batches for a specified table in ClickHouse. func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan <-chan T, maxSize uint64) { // Prepare the initial batch. - batch, err := client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err := client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) if err != nil { client.log.Error().Err(err).Msg("Failed to prepare batch") return @@ -415,7 +400,7 @@ func batchProcessor[T any](client *ClickhouseClient, tableName string, eventChan } // Prepare a new batch after sending the current batch. - batch, err = client.chConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) + batch, err = client.ChConn.PrepareBatch(context.Background(), fmt.Sprintf("INSERT INTO %s", tableName)) if err != nil { client.log.Error().Err(err).Msg("Failed to prepare new batch after sending") return diff --git a/consumer/consumer.go b/consumer/consumer.go index 6265e62..e59cc27 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -411,15 +411,14 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { // Check ClickHouse metadata, err := c.getIPMetadataFromClickHouse(ip) if err != nil { - return nil, fmt.Errorf("error querying ClickHouse: %w", err) - } - if metadata != nil { + c.log.Warn().Err(err).Str("ip", ip).Msg("Error querying ClickHouse for IP metadata, falling back to API") + } else if metadata != nil { // Found in ClickHouse, cache and return c.cacheIPMetadata(ip, metadata) return metadata, nil } - // Not found in ClickHouse, fetch from IPInfo API + // Not found in ClickHouse or error occurred, fetch from IPInfo API ipInfo, err := c.fetchIPInfoFromAPI(ip) if err != nil { return nil, fmt.Errorf("failed to fetch IP info: %w", err) @@ -427,9 +426,11 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { metadata = convertIPInfoToMetadata(ipInfo) - // Store metadata into ClickHouse - if err := c.ensureIPMetadataInClickHouse(metadata); err != nil { - return nil, fmt.Errorf("failed to ensure IP metadata in ClickHouse: %w", err) + // Only insert into ClickHouse if fetched from API + if c.chClient != nil { + if err := c.ensureIPMetadataInClickHouse(metadata); err != nil { + c.log.Warn().Err(err).Str("ip", ip).Msg("Failed to insert IP metadata into ClickHouse") + } } c.cacheIPMetadata(ip, metadata) @@ -437,29 +438,59 @@ func (c *Consumer) getIPMetadata(ip string) (*types.IPMetadataEvent, error) { return metadata, nil } + func (c *Consumer) getIPMetadataFromClickHouse(ip string) (*types.IPMetadataEvent, error) { + if c.chClient == nil { + return nil, fmt.Errorf("ClickHouse client is not initialized") + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var metadata types.IPMetadataEvent - query := fmt.Sprintf("SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type FROM ip_metadata WHERE ip = '%s'", ip) + query := ` + SELECT ip, hostname, city, region, country, latitude, longitude, postal_code, asn, asn_organization, asn_type + FROM ip_metadata + WHERE ip = ? + LIMIT 1 + ` - err := c.chClient.QueryRow(ctx, query).Scan( - &metadata.IP, &metadata.Hostname, &metadata.City, &metadata.Region, - &metadata.Country, &metadata.Latitude, &metadata.Longitude, - &metadata.PostalCode, &metadata.ASN, &metadata.ASNOrganization, &metadata.ASNType, - ) + c.log.Debug().Str("ip", ip).Msg("Querying ClickHouse for IP metadata") + rows, err := c.chClient.ChConn.Query(ctx, query, ip) if err != nil { - if err == sql.ErrNoRows { - return nil, nil // No metadata found for this IP - } + c.log.Error().Err(err).Str("ip", ip).Msg("Error executing query in ClickHouse") return nil, fmt.Errorf("error querying ClickHouse: %w", err) } + defer rows.Close() + + if rows.Next() { + if err := rows.Scan( + &metadata.IP, + &metadata.Hostname, + &metadata.City, + &metadata.Region, + &metadata.Country, + &metadata.Latitude, + &metadata.Longitude, + &metadata.PostalCode, + &metadata.ASN, + &metadata.ASNOrganization, + &metadata.ASNType, + ); err != nil { + c.log.Error().Err(err).Str("ip", ip).Msg("Error scanning row") + return nil, fmt.Errorf("error scanning row: %w", err) + } - return &metadata, nil + c.log.Debug().Str("ip", ip).Msg("Successfully retrieved IP metadata from ClickHouse") + return &metadata, nil + } else { + c.log.Debug().Str("ip", ip).Msg("No metadata found in ClickHouse for IP") + return nil, nil + } } + func (c *Consumer) fetchIPInfoFromAPI(ip string) (*ipinfo.Core, error) { client := ipinfo.NewClient(nil, nil, c.ipInfoToken) ipParsed := net.ParseIP(ip) @@ -630,14 +661,6 @@ func (c *Consumer) processIPMetadataEvents() { continue } c.log.Trace().Msg("Wrote IP metadata event to Parquet file") - - if c.chClient != nil { - if err := c.sendIPMetadataToClickHouse(ipEvent); err != nil { - c.log.Error().Err(err).Str("IP", ipEvent.IP).Msg("Failed to send IP metadata event to ClickHouse") - continue - } - c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata event sent to ClickHouse successfully") - } } }