Skip to content

Commit

Permalink
Introduce two different approaches
Browse files Browse the repository at this point in the history
  • Loading branch information
sgasse committed Sep 14, 2024
1 parent 6087717 commit 8a38407
Show file tree
Hide file tree
Showing 25 changed files with 1,031 additions and 667 deletions.
28 changes: 26 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cam_sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ log = "0.4.17"
rscam = "0.5.5"
simple-error = "0.2.3"
tokio = { version = "1.21.2", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-util = { version = "0.7.4", features = ["codec", "net"] }
18 changes: 13 additions & 5 deletions cam_sender/src/bin/socket_sender.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
use std::time::Duration;

use anyhow::{bail, Result};
use cam_sender::sensors::{get_max_res_mjpg_capture_fn, CameraWrapper};
use clap::Parser;
use clap::ValueEnum;
use common::protocol::{FrameMsg, ProtoMsg};
use env_logger::TimestampPrecision;
use futures::sink::SinkExt;
use rscam::Camera;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

#[derive(Parser, Debug)]
#[clap(author, version)]
struct Args {
struct Cli {
/// Address of the infer server to connect to
#[clap(long, default_value = "127.0.0.1:3001")]
address: String,

/// Channel name that this sender publishes to
#[clap(long, default_value = "simon")]
channel: String,

#[clap(long, default_value = "tcp")]
protocol: Protocol,
}

#[derive(Clone, Debug, ValueEnum)]
pub enum Protocol {
Tcp,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let args = Cli::parse();

env_logger::builder()
.format_timestamp(Some(TimestampPrecision::Millis))
Expand All @@ -44,7 +52,7 @@ async fn main() -> Result<()> {
}
}

async fn tcp_sender(cam: &CameraWrapper<Camera>, args: &Args) -> Result<()> {
async fn tcp_sender(cam: &CameraWrapper<Camera>, args: &Cli) -> Result<()> {
match TcpStream::connect(&args.address).await {
Ok(stream) => {
log::info!("Client connected to {}", &args.channel);
Expand Down
2 changes: 1 addition & 1 deletion cam_sender/src/sensors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn get_max_res_mjpg_capture_fn() -> Result<CameraWrapper<Camera>> {

let interval = match cam.intervals(format, resolution)? {
IntervalInfo::Discretes(intervals) => {
intervals.iter().max_by(|a, b| a.0.cmp(&b.0)).cloned()
intervals.iter().max_by(|a, b| a.1.cmp(&b.1)).cloned()
}
IntervalInfo::Stepwise {
min: _,
Expand Down
9 changes: 6 additions & 3 deletions infer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ name = "infer_server"
path = "src/lib.rs"

[dependencies]
anyhow = "1.0.75"
async-stream = "0.3.3"
axum = { version = "0.6.4", features = ["multipart", "query"] }
bytes = "1.4.0"
Expand All @@ -28,12 +29,14 @@ rusttype = "0.9.3"
serde = { version = "1.0.152", features = ["derive"] }
simple-error = "0.2.3"
smallvec = "1.10.0"
thingbuf = { version = "0.1.4", default-features = false, features = [
"static",
] }
tokio = { version = "1.25.0", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["codec"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7.4", features = ["net", "codec"] }
tract-onnx = "0.19.2"
turbojpeg = { version = "0.5.2", features = ["image"] }

[profile.release]
debug = true


73 changes: 73 additions & 0 deletions infer_server/src/bin/hour_glass.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! Infer server binary.
//!
use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use axum::{routing::get, Extension, Router};
use clap::Parser;
use env_logger::TimestampPrecision;
use infer_server::{
hour_glass::{
data_socket::spawn_data_socket,
endpoints::{faces_stream, healthcheck, named_stream},
inferer::Inferer,
router::FrameRouter,
INCOMING_FRAMES_CHANNEL, INFER_IMAGES_CHANNEL,
},
meter::spawn_meter_logger,
};

#[derive(Parser, Debug)]
#[clap(author, version)]
struct Args {
/// Address of the infer server to connect to
#[clap(long, default_value = "127.0.0.1:3000")]
server_address: String,

/// Address of the infer server to connect to
#[clap(long, default_value = "127.0.0.1:3001")]
socket_address: String,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

// Setup logger
env_logger::builder()
.format_timestamp(Some(TimestampPrecision::Millis))
.init();

let (incoming_tx, incoming_rx) = INCOMING_FRAMES_CHANNEL.split();
let (infer_tx, infer_rx) = INFER_IMAGES_CHANNEL.split();
let frame_router = Arc::new(FrameRouter::new(infer_tx));

{
let frame_router = frame_router.clone();
tokio::spawn(async move { frame_router.run(incoming_rx).await });
}

{
tokio::spawn(async move { Inferer::new(infer_rx).await.run().await });
}

// Create socket to receive image streams via network
spawn_data_socket(incoming_tx, &args.socket_address).await?;

spawn_meter_logger();

// Build HTTP server with endpoints
let app = Router::new()
.route("/healthcheck", get(healthcheck))
.route("/stream", get(named_stream))
.route("/face_stream", get(faces_stream))
.layer(Extension(frame_router));

// Serve HTTP server
let addr: SocketAddr = args.server_address.parse()?;
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
//!
use std::{net::SocketAddr, sync::Arc};

use anyhow::Result;
use axum::{routing::get, Extension, Router};
use clap::Parser;
use env_logger::TimestampPrecision;
use infer_server::{
data_socket::spawn_data_socket,
endpoints::{face_stream, healthcheck, named_stream},
inferer::InferBroker,
pubsub::NamedPubSub,
Error,
meter::spawn_meter_logger,
msg_passing::{
data_socket::spawn_data_socket,
endpoints::{healthcheck, named_stream},
router::Registry,
},
};

#[derive(Parser, Debug)]
Expand All @@ -26,38 +28,29 @@ struct Args {
}

#[tokio::main]
async fn main() -> Result<(), Error> {
async fn main() -> Result<()> {
let args = Args::parse();

// Setup logger
env_logger::builder()
.format_timestamp(Some(TimestampPrecision::Millis))
.init();

// Build Pub/Sub-Engine to communicate between data input, inference and serving via HTTP
let pubsub = Arc::new(NamedPubSub::new());
let mut registry = Registry::new();
let comm = registry.get_comm();

// Build inferer to determine faces with confidences on image streams
let inferer = Arc::new(InferBroker::new(Arc::clone(&pubsub)).await);

// Spawn separate task to run the inference on
let inferer_ = Arc::clone(&inferer);
tokio::spawn(async move {
loop {
inferer_.run().await;
}
});
tokio::spawn(async move { registry.run().await });

// Create socket to receive image streams via network
spawn_data_socket(pubsub.clone(), &args.socket_address).await?;
spawn_data_socket(comm.tcp_tasks_comm_tx.clone(), &args.socket_address).await?;

spawn_meter_logger();

// Build HTTP server with endpoints
let app = Router::new()
.route("/healthcheck", get(healthcheck))
.route("/stream", get(named_stream))
.route("/face_stream", get(face_stream))
.layer(Extension(pubsub))
.layer(Extension(inferer));
.layer(Extension(Arc::new(comm)));

// Serve HTTP server
let addr: SocketAddr = args.server_address.parse()?;
Expand Down
91 changes: 0 additions & 91 deletions infer_server/src/data_socket.rs

This file was deleted.

Loading

0 comments on commit 8a38407

Please sign in to comment.