Skip to content

Commit

Permalink
to(onebot): support listening
Browse files Browse the repository at this point in the history
  • Loading branch information
fu050409 committed Sep 26, 2024
1 parent 3bcc320 commit 6a4a1f5
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
19 changes: 14 additions & 5 deletions crates/aionbot-adapter-onebot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ pub mod ws;

use std::sync::Arc;

use aionbot_core::runtime::{Runtime, StateManager};
use aionbot_core::{
event::Event,
runtime::{Runtime, StateManager},
};
use anyhow::Result;
use tokio::sync::broadcast::Receiver;
use ws::Onebot;

pub trait Adapter {
fn reply(&self, message: &str) -> impl std::future::Future<Output = Result<()>> + Send;
Expand All @@ -20,14 +25,14 @@ impl Adapter for aionbot_core::event::Event {
}

pub struct OnebotRuntime {
// connect: Option<Arc<ReverseWsConnect>>,
onebot: Option<Arc<Onebot>>,
state: Arc<StateManager>,
}

impl Default for OnebotRuntime {
fn default() -> Self {
Self {
// connect: None,
onebot: None,
state: Arc::new(StateManager::default()),
}
}
Expand All @@ -45,7 +50,7 @@ impl Runtime for OnebotRuntime {

async fn prepare(&mut self) -> Result<()> {
println!("Preparing Onebot runtime");
let onebot = ws::Onebot::new().listen(Default::default()).await?;
self.onebot = Some(ws::Onebot::new().listen(Default::default()).await?);
println!("Onebot runtime prepared");
Ok(())
}
Expand All @@ -55,7 +60,11 @@ impl Runtime for OnebotRuntime {
}

async fn finalize(&mut self) -> Result<()> {
// self.connect.as_mut().unwrap().subscribe().await;
let mut rx = self.onebot.as_ref().cloned().unwrap().subscribe().await;
loop {
let event = rx.recv().await?;
println!("Received event: {:?}", event);
}
Ok(())
}

Expand Down
72 changes: 53 additions & 19 deletions crates/aionbot-adapter-onebot/src/ws.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use aionbot_core::event::Event;
use anyhow::Result;
use futures_util::StreamExt;
use tokio::{net::TcpListener, sync::Mutex};
Expand All @@ -18,20 +19,31 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 6700,
path: "/onebot".to_string(),
host: "0.0.0.0".to_string(),
port: 8080,
path: "/onebot/v11".to_string(),
access_token: None,
}
}
}

#[derive(Default)]
pub struct Onebot {
sender: tokio::sync::broadcast::Sender<Event>,
listen_handle: Mutex<Option<tokio::task::JoinHandle<Result<()>>>>,
bot_handles: Mutex<Vec<tokio::task::JoinHandle<Result<()>>>>,
}

impl Default for Onebot {
fn default() -> Self {
let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);
Self {
sender: tx,
listen_handle: Mutex::new(None),
bot_handles: Mutex::new(vec![]),
}
}
}

impl Onebot {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
Expand All @@ -40,33 +52,55 @@ impl Onebot {
pub async fn listen(self: Arc<Self>, config: Config) -> Result<Arc<Self>> {
let onebot = self.clone();

let tcp_listener = TcpListener::bind(format!("{}:{}", config.host, config.port)).await?;
let bind_addr = format!("{}:{}", config.host, config.port);
let tcp_listener = TcpListener::bind(&bind_addr).await?;
println!("Listening on {}", bind_addr);

self.listen_handle
.lock()
.await
.replace(tokio::spawn(async move {
println!("Starting bot listening loop");
while let Ok((stream, _)) = tcp_listener.accept().await {
onebot.bot_handles.lock().await.push(tokio::spawn(async {
let ws_stream =
accept_hdr_async(stream, |req: &Request, response: Response| {
let headers = req.headers();
let bot_id = headers
.get("X-Self-ID")
.map(|id| id.to_str().unwrap().to_string())
.unwrap();
Ok(response)
})
.await?;
ws_stream.for_each(|message| async {}).await;
Ok(())
}));
println!("New connection found.");
let sender = onebot.sender.clone();
onebot
.bot_handles
.lock()
.await
.push(tokio::spawn(async move {
let ws_stream =
accept_hdr_async(stream, |req: &Request, response: Response| {
let headers = req.headers();
let bot_id = headers
.get("X-Self-ID")
.map(|id| id.to_str().unwrap().to_string())
.unwrap_or_default();
sender.send(Default::default()).unwrap();
Ok(response)
})
.await?;
ws_stream
.for_each(|message| {
let value = sender.clone();
async move {
println!("Received message: {:?}", &message);
value.send(Default::default());
}
})
.await;
Ok(())
}));
}
Ok(())
}));
Ok(self)
}

pub async fn subscribe(self: Arc<Self>) -> tokio::sync::broadcast::Receiver<Event> {
self.sender.subscribe()
}

// pub async fn close(&mut self) {
// if let Some(handle) = self.listen_handle.take() {
// handle.abort();
Expand Down
5 changes: 3 additions & 2 deletions crates/aionbot-core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use core::fmt;

use serde_json::Value;

#[derive(Debug, Clone)]
pub struct MessageSegment {
pub text: String,
pub r#type: String,
Expand All @@ -25,7 +26,7 @@ impl From<String> for MessageSegment {
}
}

#[derive(Default)]
#[derive(Default, Debug, Clone)]
pub struct Message {
pub entity: Option<String>,
pub segments: Vec<MessageSegment>,
Expand All @@ -40,7 +41,7 @@ impl fmt::Display for Message {
}
}

#[derive(Default)]
#[derive(Default, Debug, Clone)]
pub struct Event {
pub plain_data: Message,
pub user_id: String,
Expand Down

0 comments on commit 6a4a1f5

Please sign in to comment.