Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Added more async capabitilies for storage
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh committed Oct 18, 2023
1 parent 47fdf1a commit 35cf760
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 36 deletions.
68 changes: 34 additions & 34 deletions nyx-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
use std::{
collections::HashMap,
fs::File,
io::{self, Read, Seek},
path::Path,
io::{self},
sync::Arc,
};

use async_std::channel::{bounded, Sender};
use async_std::{
channel::{bounded, Sender},
fs::File,
io::{prelude::SeekExt, ReadExt, SeekFrom},
path::Path,
task::JoinHandle,
};
use storage_sender::StorageSender;
use write_queue::WriteQueue;

mod offsets;
mod storage_sender;
mod write_queue;

#[derive(Clone, Copy)]
struct Offsets {
start: usize,
end: usize,
}

impl Offsets {
pub fn new(start: usize, end: usize) -> Result<Self, String> {
if start >= end {
return Err(format!(
"Start ({}) can't be greater o equal to end ({}) of file byte",
start, end
));
}

Ok(Self { start, end })
}
}
use offsets::Offsets;

pub struct Storage {
indices: HashMap<usize, Offsets>,
Expand All @@ -39,33 +27,35 @@ pub struct Storage {
// `retrievable_buffer` of its own to read into instead.
retrivable_buffer: [u8; 8192],
write_sender: Sender<Vec<u8>>,
write_queue_handle: JoinHandle<Result<(), std::io::Error>>,
}

impl Storage {
pub fn new(path: &Path, max_queue: usize) -> Result<Self, String> {
pub async fn new(path: &'static Path, max_queue: usize) -> Result<Self, String> {
let (write_sender, write_receiver) = bounded(max_queue);

async_std::task::spawn(WriteQueue::run(write_receiver));
let write_queue_handle = async_std::task::spawn(WriteQueue::run(write_receiver, &path));

Ok(Self {
indices: HashMap::new(),
file: Arc::new(File::open(path).map_err(|e| e.to_string())?),
file: Arc::new(File::open(path).await.map_err(|e| e.to_string())?),
retrivable_buffer: [0; 8192],
write_sender,
write_queue_handle,
})
}

pub fn new_sender(&mut self) -> Result<StorageSender, String> {
Ok(StorageSender::new(self.write_sender.clone()))
pub fn get_storage_sender(&mut self) -> StorageSender {
StorageSender::new(self.write_sender.clone())
}

pub fn get(&mut self, index: usize) -> Result<&[u8], String> {
pub async fn get(&mut self, index: usize) -> Result<&[u8], String> {
let offsets = self
.indices
.get(&index)
.ok_or("record doesn't exist.".to_string())?;

let data_size = offsets.end - offsets.start;
let data_size = offsets.end() - offsets.start();

if data_size > self.retrivable_buffer.len() {
return Err(format!(
Expand All @@ -76,17 +66,27 @@ impl Storage {
}

return self
.seek_bytes_between(offsets.start, data_size)
.seek_bytes_between(offsets.start(), data_size)
.await
.map_err(|e| format!("Error in Storage (get): {}", e));
}

fn seek_bytes_between(&mut self, start: usize, data_size: usize) -> io::Result<&[u8]> {
async fn seek_bytes_between(&mut self, start: usize, data_size: usize) -> io::Result<&[u8]> {
let mut file = &*self.file;
file.seek(std::io::SeekFrom::Start(start as u64))?;
file.read_exact(&mut self.retrivable_buffer[..data_size])?;
file.rewind()?;
file.seek(SeekFrom::Start(start as u64)).await?;
let n = file.read(&mut self.retrivable_buffer[..data_size]).await?;
if n == 0 {
// Theoratically should never get here
panic!("Got 0 bytes in file read")
};
self.rewind().await?;
return Ok(&self.retrivable_buffer[..data_size]);
}

async fn rewind(&mut self) -> io::Result<u64> {
let mut file = &*self.file;
file.seek(SeekFrom::Start(0)).await
}
}

#[cfg(test)]
Expand Down
26 changes: 26 additions & 0 deletions nyx-storage/src/offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#[derive(Clone, Copy)]
pub struct Offsets {
start: usize,
end: usize,
}

impl Offsets {
pub fn new(start: usize, end: usize) -> Result<Self, String> {
if start >= end {
return Err(format!(
"Start ({}) can't be greater or equal to end ({})",
start, end
));
}

Ok(Self { start, end })
}

pub fn start(&self) -> usize {
return self.start;
}

pub fn end(&self) -> usize {
return self.end;
}
}
10 changes: 8 additions & 2 deletions nyx-storage/src/write_queue.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use async_std::channel::Receiver;
use std::io;

use async_std::{channel::Receiver, fs::OpenOptions, io::WriteExt, path::Path};

pub struct WriteQueue;

impl WriteQueue {
pub async fn run(queue: Receiver<Vec<u8>>) -> Result<(), String> {
pub async fn run(queue: Receiver<Vec<u8>>, path: &Path) -> io::Result<()> {
let mut data_file = OpenOptions::new().append(true).open(path).await?;

data_file.write_all(b"I am trying to write").await?;

while let Ok(data) = queue.recv().await {
println!("Do something with received data: {:?}", data);
}
Expand Down

0 comments on commit 35cf760

Please sign in to comment.