Skip to content

Latest commit

 

History

History
438 lines (337 loc) · 12.6 KB

build-with-rust.md

File metadata and controls

438 lines (337 loc) · 12.6 KB

Build an IoT App with Rust

Architecture

This section will walk through and explain the code for the different commands. As explained in the Getting Started page, the project is structured as follow:

  • migrate (/bin/migrate/main.rs) - creates the carepet keyspace and tables
  • collar (/bin/sensor/main.rs) - generates a pet health data and pushes it into the storage
  • web app (/main.rs) - REST API service for tracking pets health state

Migrate

Start by creating a local ScyllaDB cluster consisting of 3 nodes:

docker-compose up -d

Docker-compose will spin up a ScyllaDB cluster consisting of 3 nodes (carepet-scylla1, carepet-scylla2 and carepet-scylla3) along with the app (for example go-app) container. Wait for about two minutes and check the status of the cluster: To check the status of the cluster:

docker exec -it carepet-scylla1 nodetool status

Once all the nodes are in UN - Up Normal status, run the below commands:

The below command allows you to get node IP address:

docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' carepet-scylla1

The run the following commands to execute the migrate main function.

NODE1=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' carepet-scylla1)
cargo run --bin migrate -- --hosts $NODE1

The command executes the main function in the bin/migrate/main.rs. The function creates the keyspace and tables that you need to run the collar and server services.

The below code in the bin/migrate/main.rs creates a new session then calls the create_keyspace , migrate functions.

// migrate/main.rs

async fn main() -> Result<()> {
    care_pet::log::init();

    let app = App::from_args();
    debug!("Configuration = {:?}", app);

    info!("Bootstrapping database...");

    let sess = db::new_session(&app.db_config).await?;

    db::create_keyspace(&sess).await?;
    db::migrate(&sess).await?;

    Ok(())
}

The new_session function takes the config as a parameter and uses SessionBuilder class to crete a new session.

// db/mod.rs

pub async fn new_session(config: &Config) -> Result<Session> {
    info!("Connecting to {}", config.hosts.join(", "));

    SessionBuilder::new()
        .known_nodes(&config.hosts)
        .connection_timeout(config.timeout.into())
        .user(
            config.username.clone().unwrap_or_default(),
            config.password.clone().unwrap_or_default(),
        )
        .build()
        .await
        .map_err(From::from)
}

For more information about creating a new session with the Rust Driver, please have a look at the docs.

create_keyspace function takes a session as an argument and creates a keyspace as defined in db/keyspace.cql:

CREATE KEYSPACE IF NOT EXISTS carepet WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '3' };

The CQL query above creates a new keyspace named carepet, with NetworkTopologyStrategy as replication strategy and a replication factor of 3. More information about keyspace and replication on Scylla University.

Finally, migrate will execute the queries listed in db/migrate.cql to create the tables you need for the project.

CREATE TABLE IF NOT EXISTS carepet.owner
(
    owner_id UUID,
    address TEXT,
    name    TEXT,
    PRIMARY KEY (owner_id)
);

...

You can check the database structure with:

docker exec -it carepet-scylla1 cqlsh
cqlsh> USE carepet;
cqlsh:carepet> DESCRIBE TABLES
cqlsh:carepet> DESCRIBE TABLE pet

You should expect the following result:

CREATE TABLE carepet.pet (
    owner_id uuid,
    pet_id uuid,
    chip_id text,
    species text,
    breed   text,
    color   text,
    gender  text,
    address text,
    age int,
    name text,
    weight float,
    PRIMARY KEY (owner_id, pet_id)
) WITH CLUSTERING ORDER BY (pet_id ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'}
    AND comment = ''
    AND compaction = {'class': 'SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

Sensor

The sensor service simulates the collar's activity and periodically saves data to the database. Use the below commands to run the sensor service:

NODE1=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' carepet-scylla1)
cargo run --bin sensor -- --hosts $NODE1 --measure 5s --buffer-interval 1m

The above command executes bin/sensor/main.rs and and takes the following as arguments.

  • hosts : the IP address of the ScyllaDB node.
  • measure: the interval between to sensor measures.
  • buffer-interval: the interval between two database queries.
// sensor/main.rs

#[tokio::main]
async fn main() -> Result<()> {
    care_pet::log::init();

    let app = App::from_args();
    debug!("Configuration = {:?}", &app);

    info!("Welcome to the Pet collar simulator");

    let sess = db::new_session_with_keyspace(&app.db_config).await?;

    let (owner, pet, sensors) = random_data();
    save_data(&sess, &owner, &pet, &sensors).await?;
    run_sensor_data(&app, &sess, sensors).await?;

    Ok(())
}

The app object contains the command's arguments listed above. We then create a new session sess using new_session_with_keyspace function defined in db/mod.rs:

// db/mod.rs

pub async fn new_session_with_keyspace(config: &Config) -> Result<Session> {
    let session = new_session(config).await?;
    session.use_keyspace(KEYSPACE, true).await?;
    Ok(session)
}

The save_data method connects to the datbase and saves random owner, pet and the sensors to the database using insert_query macro defined in src/mod.rs.

// sensor/main.rs

async fn save_data(sess: &Session, owner: &Owner, pet: &Pet, sensors: &[Sensor]) -> Result<()> {
    sess.query(insert_query!(Owner), owner).await?;
    info!("New owner # {}", owner.owner_id);

    sess.query(insert_query!(Pet), pet).await?;
    info!("New pet # {}", pet.pet_id);

    for sensor in sensors {
        sess.query(insert_query!(Sensor), sensor).await?;
    }

    Ok(())
}

The run_sensor_data generates random data and inserts it to the database every buffer_interval.

async fn run_sensor_data(cfg: &App, sess: &Session, sensors: Vec<Sensor>) -> Result<()> {
    let measure: time::Duration = cfg.measure.into();
    let buffer_interval: time::Duration = cfg.buffer_interval.into();

    let mut last = Instant::now();
    loop {
        let mut measures = vec![];
        while last.elapsed() < buffer_interval {
            sleep(measure).await;

            for sensor in &sensors {
                let measure = read_sensor_data(sensor);
                info!(
                    "sensor # {} type {} new measure {} ts {}",
                    sensor.sensor_id,
                    sensor.r#type.as_str(),
                    &measure.value,
                    measure.ts.format_rfc3339(),
                );

                measures.push(measure);
            }
        }

        last = last
            + time::Duration::from_nanos(
                (measure.as_nanos() * (last.elapsed().as_nanos() / measure.as_nanos())) as u64,
            );

        info!("Pushing data");

        let batch = measures.iter().fold(Batch::default(), |mut batch, _| {
            batch.append_statement(insert_query!(Measure));
            batch
        });

        sess.batch(&batch, measures)
            .await
            .map_err(|err| error!("execute batch query {:?}", err))
            .ok();
    }
}

Server

The server service is a REST API for tracking the pets’ health state. The service was built using Rocket and allows users to query the database via HTTP.

Run the following commands to start the server:

NODE1=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' carepet-scylla1)
cargo run -- --hosts $NODE1

The src/main.rs main function mounts the api on /api and defines the routes.

// src/main.rs

#[rocket::main]
async fn main() -> Result<()> {
    care_pet::log::init();

    let app = App::from_args();
    if app.verbose {
        info!("Configuration = {:?}", app);
    }

    let sess = db::new_session_with_keyspace(&app.db_config).await?;

    rocket::build()
        .mount(
            "/api",
            routes![
                handler::measures::find_sensor_data_by_sensor_id_and_time_range,
                handler::owner::find_owner_by_id,
                handler::pets::find_pets_by_owner_id,
                handler::sensors::find_sensors_by_pet_id,
                handler::avg::find_sensor_avg_by_sensor_id_and_day
            ],
        )
        .manage(sess)
        .launch()
        .await
        .map_err(From::from)
}

The handlers can be found in the src/handler folder for each route.

Let's have a look at handler/mesure.rs file:

#[get("/sensor/<id>/values?<from>&<to>")]
pub async fn find_sensor_data_by_sensor_id_and_time_range(
    session: &State<Session>,
    id: UuidParam,
    from: DateTimeParam,
    to: DateTimeParam,
) -> Result<Json<Vec<f32>>, JsonError> {
    let rows = session
        .query(
            format!(
                "SELECT {} FROM {} WHERE {} = ? and {} >= ? and {} <= ?",
                Measure::FIELD_NAMES.value,
                Measure::table(),
                Measure::FIELD_NAMES.sensor_id,
                Measure::FIELD_NAMES.ts,
                Measure::FIELD_NAMES.ts,
            ),
            (id.0, from.0, to.0),
        )
        .await
        .map_err(|err| json_err(Status::InternalServerError, err))?
        .rows
        .unwrap_or_default()
        .into_typed::<(f32,)>();

    let values = rows
        .map(|v| v.map(|v| v.0))
        .collect::<Result<Vec<_>, _>>()
        .map_err(|err| json_err(Status::InternalServerError, err))?;

    Ok(Json(values))
}

The GET request on URL /sensor/<id>/values?<from>&<to> triggers find_sensor_data_by_sensor_id_and_time_range function.

find_sensor_data_by_sensor_id_and_time_range takes session, id, from and to as params. The function runs a SELECT query then returns rows.

Retrieving informations from API

To test out the API in your terminal, use the following command to retrieve informations of a specific pet owner:

curl http://127.0.0.1:8000/owner/{id}

If you don't have an owner_id, run the sensor command and it will generate users and pets on your terminal.

and you should receive a response similar to this:

{
  "owner_id": "5b5a7b4d-a2c0-48b0-91e1-de6a5b37c923",
  "address": "home",
  "name": "sedtdkaa"
}

If you want to list owner's pets you can use the following command:

curl http://127.0.0.1:8000/owner/{id}/pets

and you should receive a response similar to this:

[
  {
    "owner_id": "5b5a7b4d-a2c0-48b0-91e1-de6a5b37c923",
    "pet_id": "9e9facb9-3bd8-4451-b179-8c951cdf0999",
    "chip_id": null,
    "species": "dog",
    "breed": "golden-retriever",
    "color": "black",
    "gender": "M",
    "age": 4,
    "weight": 9.523097,
    "address": "awesome-address",
    "name": "doggo"
  }
]

If you want to list the active pet sensors you can use the following command:

curl http://127.0.0.1:8000/pet/{pet_id}/sensors

and you should receive a response similar to this:

[
  {
    "pet_id": "9e9facb9-3bd8-4451-b179-8c951cdf0999",
    "sensor_id": "7a8b3831-0512-4501-90f2-700c7133aeed",
    "type": "T"
  },
  {
    "pet_id": "9e9facb9-3bd8-4451-b179-8c951cdf0999",
    "sensor_id": "81250bab-cf1c-4c7a-84f1-b291a0f325ef",
    "type": "P"
  },
  {
    "pet_id": "9e9facb9-3bd8-4451-b179-8c951cdf0999",
    "sensor_id": "a22a2fdb-4aad-4abe-b0d9-381aa07a26af",
    "type": "L"
  }
]

Resources