Skip to content

Latest commit

 

History

History
196 lines (153 loc) · 4.35 KB

README.md

File metadata and controls

196 lines (153 loc) · 4.35 KB

S3 Replication to Clickhouse

This example showcase how to integrate data from S3 to Clickhouse in replication (CDC) modes:

  1. Via polling new files
  2. Via fetching new files notifications via SQS

Architecture Diagram

img_1.png

Overview

  1. S3: To prepare s3 backet use terraform provided within this example.

  2. Transfer: An application that replicates changes from S3 to Clickhouse.

  3. Clickhouse: An open source big data platform for distributed storage and processing.

  4. Load Generator: To generate load we will load files to S3 via aws cli command.

Getting Started

Prerequisites

  • Docker and Docker Compose installed on your machine.
  • AWS CLI here
  • S3 bucket created via terraform here

Setup Instructions

  1. Clone the Repository:

    git clone https://github.com/doublecloud/transfer
    cd transfer/examples/s3sqs2ch
  2. Prepare S3 Bucket:

    terraform init
    terraform apply \
       -var="bucket_name=MY_BUCKET_NAME" \
       -var="sqs_name=MY_SQS_NAME" \
       -var="profile=MY_PROFILE_NAME"
  3. Access to Clickhouse: Access to ClickHouse via CLI:

    clickhouse-client --host localhost --port 9000 --user default --password 'ch_password'
  4. Seed S3 bucket with files:

    #!/bin/bash
     export BUCKET=MY_BUCKET_NAME
     export PROFILE=MY_ACCESS_KEY
     for i in {1..10}
     do
     echo '{"id": '$i', "value": "data'$i'"}' > "file_$i.json"
     aws s3 cp "file_$i.json" s3://${BUCKET}/ --profile ${PROFILE}
     rm "file_$i.json"
     done
  5. Build and Run the Docker Compose:

    export BUCKET=MY_BUCKET_NAME
    export ACCESS_KEY=MY_ACCESS_KEY
    export SECRET_KEY=MY_SECRET_KEY
    export QUEUE=MY_SQS_NAME
    export ACCOUNT=MY_AWS_ACCOUNT_ID
    export REGION=MY_AWS_REGION
    
    docker-compose up --build

Configuration Files

  • transfer.yaml: Specifies the source (Mysql) and destination (CH) settings inside docker-compose
id: test
type: INCREMENT_ONLY
src:
  type: s3
  params:
    Bucket: ${BUCKET}
    ConnectionConfig:
      AccessKey: ${ACCESS_KEY} # YOUR ACCESS_KEY
      S3ForcePathStyle: true
      SecretKey: ${SECRET_KEY} # YOUR SECRET_KEY
      UseSSL: true
    ReadBatchSize: 1000999900
    InflightLimit: 100000000
    TableName: my_table
    InputFormat: JSON
    OutputSchema: # Schema format, each item here will be resulted in clickhouse column
      - name: id
        type: int64
        key: true # Will be included in clickhouse primary key
      - name: value
        type: string
    AirbyteFormat: ''
    PathPattern: '*.json'
    Concurrency: 10
    Format:
      JSONSettings: {}
    EventSource:
      SQS:
        QueueName: ${QUEUE}
        OwnerAccountID: ${ACCOUNT}
        ConnectionConfig:
          AccessKey: ${ACCESS_KEY} # YOUR ACCESS_KEY
          SecretKey: ${SECRET_KEY} # YOUR SECRET_KEY
          UseSSL: true
          Region: ${REGION}
    UnparsedPolicy: continue
dst:
  type: ch
  params:
    ShardsList:
      - Hosts:
          - clickhouse
    HTTPPort: 8123
    NativePort: 9000
    Database: default
    User: default
    Password: ch_password
transformation: '{}'
type_system_version: 8

Exploring results

Once docker compose up and running your can explore results via clickhouse-cli

Exploring Results Verify Data in Clickhouse: Open the Clickhouse CLI and run:

SELECT count(*)
FROM my_table

It will prompt:

┌─count()─┐
│      10 │
└─────────┘

If we add more files:

    export BUCKET=MY_BUCKET_NAME
    export PROFILE=MY_ACCESS_KEY
    for i in {11..20}
    do
    echo '{"id": '$i', "value": "data'$i'"}' > "file_$i.json"
    aws s3 cp "file_$i.json" s3://${BUCKET}/ --profile ${PROFILE}
    rm "file_$i.json"
    done

It will automatically ingest 10 more files:

SELECT count(*)
FROM my_table

It will prompt:

┌─count()─┐
│      20 │
└─────────┘

Stopping the Application

To stop the Docker containers, run:

docker-compose down

Conclusion

This example demonstrated setting up a CDC pipeline from S3 to Clickhouse using polling and SQS modes. It can be extended to handle large-scale data ingestion and complex transformations.