Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
bagedevimo committed Sep 5, 2019
1 parent 0d801ef commit 162dc2e
Show file tree
Hide file tree
Showing 11 changed files with 1,234 additions and 79 deletions.
1,034 changes: 1,034 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 21 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,24 @@ edition = "2018"
[dependencies]
hex = "0.3.2"
flate2 = { version = "1.0" }
rust-crypto = "0.2.36"
rust-crypto = "0.2.36"

# futures = "0.1"
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }

tarpc = { version = "0.18", features = ["serde1"] }
tarpc-lib = "0.6"
tarpc-bincode-transport = "0.7"

async-bincode = "0.4"
bincode = "1.1.4"

tokio = "0.1.21"
tokio-io = "0.1.12"
tokio-tcp = "0.1.3"
tokio-executor = "0.1.7"
tokio-async-await = "0.1.7"

serde = "1.0.92"

clap = "2.0"
60 changes: 42 additions & 18 deletions src/bin/diffuse_recv_pack.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
extern crate hex;
#![feature(async_await)]

use std::fs::File;
use std::io;
use std::io::Read;
use std::io::Write;
use futures::{compat::Executor01CompatExt, prelude::*};

use std::io::Seek;
use std::io::{self, Read};
use std::net::SocketAddr;
use tarpc::{client, context};

pub async fn run(server_addr: SocketAddr, name: String) -> io::Result<()> {
let transport = tarpc_bincode_transport::connect(&server_addr).await?;

// new_stub is generated by the service! macro. Like Server, it takes a config and any
// Transport as input, and returns a Client, also generated by the macro.
// by the service mcro.
let mut client = diffuse::proto::new_stub(client::Config::default(), transport).await?;

// The client has an RPC method for each RPC defined in service!. It takes the same args
// as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), name).await?;

eprintln!("remote: {}", hello);

fn main() {
let input = io::stdin();

print!("0000000000000000000000000000000000000000 capabilities^{{}}");
print!("\0");
print!("report-status delete-refs side-band-64k quiet atomic ofs-delta push-options agent=git/2.21.0");
print!(
"report-status delete-refs side-band-64k quiet atomic push-options agent=diffuse/2.21.0"
);
print!("\n");

let mut buffer = Vec::new();
std::io::stdin().lock().read_to_end(&mut buffer);
let mut cursor = std::io::Cursor::new(buffer);

let mut content = Vec::new();
cursor.read_to_end(&mut content);

let mut f = File::create("foo.txt").unwrap();
f.write_all(&content).unwrap();

cursor.seek(std::io::SeekFrom::Start(0));

let mut conn = diffuse::git::Connection::new(cursor);
let mut database = diffuse::git::Database::new(&mut client);
let mut conn = diffuse::git::Connection::new(cursor, &mut database);

loop {
let packet = match conn.receive_packet() {
Expand All @@ -45,11 +53,27 @@ fn main() {
}

eprintln!(
"REMOTE: Server has {} objects in database",
"remote:\nremote: Server has {} objects in database\nremote:",
conn.get_database().object_count()
);

eprintln!("REMOTE: Server object listing");
conn.get_database().dump();
eprintln!("REMOTE: End server object listing");

Ok(())
}

fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());

let server_addr = "127.0.0.1:11234".parse().unwrap();
let name = "diffuse_recv_pack";

tokio::run(
run(server_addr, name.into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}
15 changes: 14 additions & 1 deletion src/bin/diffused.rs
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
fn main() {}
#![feature(arbitrary_self_types, await_macro, async_await, proc_macro_hygiene)]

use futures::{compat::Executor01CompatExt, prelude::*};

fn main() {
tarpc::init(tokio::executor::DefaultExecutor::current().compat());

tokio::run(
diffuse::proto::server::run(([0, 0, 0, 0], 11234).into())
.map_err(|e| eprintln!("Oh no: {}", e))
.boxed()
.compat(),
);
}
62 changes: 25 additions & 37 deletions src/git/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::util;
use std::io::Read;
use std::io::Seek;

use crate::git::database::get_object_id;
use futures::future::Future;

use crate::git::database::Database;
use crate::git::database::Record;

Expand All @@ -23,16 +24,16 @@ const RECORD_TYPE_REF_DELTA: u8 = 7;

pub const GIT_MAX_COPY: u64 = 0x10000;

pub struct Connection {
pub struct Connection<'a> {
stream: std::io::Cursor<Vec<u8>>,
database: Database,
database: &'a mut Database<'a>,
}

impl Connection {
pub fn new(str: std::io::Cursor<Vec<u8>>) -> Connection {
impl<'a> Connection<'a> {
pub fn new(str: std::io::Cursor<Vec<u8>>, database: &'a mut Database<'a>) -> Connection<'a> {
Connection {
stream: str,
database: Database::new(),
database: database,
}
}

Expand Down Expand Up @@ -70,7 +71,8 @@ impl Connection {
if size > 0 {
let mut read_buffer = self.stream.clone().take((size - 4) as u64);
self.stream
.seek(std::io::SeekFrom::Current(size as i64 - 4));
.seek(std::io::SeekFrom::Current(size as i64 - 4))
.unwrap();
match read_buffer.read_to_end(&mut buffer) {
Ok(_) => {}
Err(e) => panic!("Unexpected EOF while reading message: {}", e),
Expand Down Expand Up @@ -115,7 +117,10 @@ fn parse_pack<T: Read>(reader: &mut T, database: &mut Database) -> Vec<Record> {
for _pack_index in 0..pack_object_count {
let pack_object = parse_pack_object_record(reader, database);
pack_objects.push(pack_object.clone());
database.insert(pack_object);

eprintln!("Inserting..");
tokio::spawn(database.insert(pack_object));
eprintln!("Inserting done");
}

let mut trailer_signature: [u8; 20] = [0; 20];
Expand Down Expand Up @@ -232,7 +237,7 @@ fn parse_pack_object_record(

fn inflate_xdelta_record<T: Read>(mut reader: &mut T, database: &mut Database) -> Vec<u8> {
let mut oid_bytes = [0; 20];
reader.read(&mut oid_bytes);
reader.read(&mut oid_bytes).unwrap();

let source_id = crate::git::database::ObjectID::from_oid_bytes(oid_bytes);
let source_object = database.fetch(&source_id);
Expand All @@ -243,17 +248,17 @@ fn inflate_xdelta_record<T: Read>(mut reader: &mut T, database: &mut Database) -
// let (byte, value) = crate::git::connection::read_variable_length_int(reader);
// eprintln!("byte: {:?}, value: {:?}", byte, value as i8);

let (bytes, compressed_byte_count) = inflate_record_data(&mut reader);
let (bytes, _) = inflate_record_data(&mut reader);
let mut secondary_cursor = std::io::Cursor::new(bytes.clone());

let (_, v1) = crate::git::connection::read_variable_length_int(&mut secondary_cursor, 7);
let (_, v2) = crate::git::connection::read_variable_length_int(&mut secondary_cursor, 7);
let (_, _v1) = crate::git::connection::read_variable_length_int(&mut secondary_cursor, 7);
let (_, _v2) = crate::git::connection::read_variable_length_int(&mut secondary_cursor, 7);

let mut out_buffer = Vec::new();

while secondary_cursor.stream_position().unwrap() < secondary_cursor.stream_len().unwrap() {
let mut peek: [u8; 1] = [0; 1];
secondary_cursor.read(&mut peek);
secondary_cursor.read(&mut peek).unwrap();

// secondary_cursor.seek(std::io::SeekFrom::Current(-1));

Expand All @@ -262,21 +267,17 @@ fn inflate_xdelta_record<T: Read>(mut reader: &mut T, database: &mut Database) -

// let mut buffer = vec![0u8; size as usize];
// reader.read_exact(&mut buffer).unwrap();
if bytes.len() == 20 {
eprintln!("Insert: {}", peek[0]);
for _ in 0..peek[0] {
let mut new_byte: [u8; 1] = [0; 1];
secondary_cursor.read(&mut new_byte);
out_buffer.push(new_byte[0]);
}

out_buffer.push(peek[0]);
} else {
let value =
crate::git::record::read_packed_int_56le(&mut secondary_cursor, peek[0] as u64);
let offset = value & 0xffffffff;
let size = value >> 32;

if bytes.len() == 20 {
eprintln!("Copy: {} -> {}", offset, offset + size);
}

let actual_size = if size == 0 {
crate::git::connection::GIT_MAX_COPY
} else {
Expand All @@ -291,9 +292,10 @@ fn inflate_xdelta_record<T: Read>(mut reader: &mut T, database: &mut Database) -
Record::Blob { data, .. } => data,
};

let mut bytes_to_copy: Vec<u8> = vec![0; size as usize];
let mut bytes_to_copy: Vec<u8> = vec![0; actual_size as usize];

bytes_to_copy.copy_from_slice(&data[offset as usize..(offset + size) as usize]);
bytes_to_copy
.copy_from_slice(&data[offset as usize..(offset + actual_size) as usize]);
// let bytes_to_copy = data[offset as usize..size as usize];

// eprintln!("Copying\n{}\n\n", String::from_utf8_lossy(&bytes_to_copy));
Expand All @@ -309,11 +311,6 @@ fn inflate_xdelta_record<T: Read>(mut reader: &mut T, database: &mut Database) -
}
}

if out_buffer.len() > 300 && out_buffer.len() < 400 {
eprintln!("{:?}", out_buffer);
eprintln!("=============({})\n{:?}", bytes.len(), bytes);
}

out_buffer
}

Expand Down Expand Up @@ -343,14 +340,5 @@ fn inflate_record_data<T: Read>(reader: &mut T) -> (Vec<u8>, u64) {
}
}

if deflater.total_out() > 300 && deflater.total_out() < 400 {
eprintln!(
"Inflate finished, inflated to {} bytes:\n==============\n{:?}\n------------------\n{:?}\n\n",
deflater.total_in(),
input,
output,
);
}

(output, deflater.total_in())
}
37 changes: 24 additions & 13 deletions src/git/database.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
#![feature(async_await)]

use crypto::digest::Digest;
use crypto::sha1::Sha1;
use futures::future::join;
use futures::future::FutureExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Write;
use std::pin::Pin;
use tarpc::{client, context};

use futures::{compat::Executor01CompatExt, prelude::*};

pub struct Database {
pub struct Database<'a> {
pub entries: HashMap<ObjectID, Record>,
pub client: &'a mut crate::proto::Client,
}

impl Database {
pub fn new() -> Database {
impl<'a> Database<'a> {
pub fn new(client: &'a mut crate::proto::Client) -> Database<'a> {
Database {
entries: HashMap::new(),
client: client,
}
}

pub fn insert(&mut self, record: Record) -> Option<ObjectID> {
pub async fn insert(&mut self, record: Record) -> Option<ObjectID> {
let object_id_str = get_object_id(&record);
let object_id = ObjectID::from_oid_string(object_id_str);

self.entries.insert(object_id.clone(), record);
self.entries.insert(object_id.clone(), record.clone());

let result = match record {
Record::Blob { .. } => self.client.store_blob(context::current(), record).wait(),
_ => true,
};

Some(object_id)
}
Expand Down Expand Up @@ -63,10 +78,6 @@ impl ObjectID {
oid_string: string,
}
}

fn string(&self) -> String {
self.oid_string.clone()
}
}

impl std::cmp::PartialEq for ObjectID {
Expand All @@ -89,7 +100,7 @@ impl std::fmt::Display for ObjectID {
}
}

#[derive(Debug, Clone)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum Record {
Commit {
data: Vec<u8>,
Expand All @@ -104,7 +115,7 @@ pub enum Record {
},
}

#[derive(Debug, Clone)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct TreeEntry {
pub mode: Vec<u8>,
pub name: String,
Expand Down Expand Up @@ -139,7 +150,7 @@ pub fn get_object_id(record: &Record) -> String {
// f.write(&hash_data);

if size == 377 {
std::fs::write("dumped_object", &hash_data);
std::fs::write("dumped_object", &hash_data).unwrap();
eprintln!(
"{} is \n{:?}\n\n",
hasher.result_str(),
Expand Down
2 changes: 2 additions & 0 deletions src/git/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ mod record;
pub use connection::Connection;
pub use connection::ConnectionResult;
pub use connection::Packet;
pub use database::Database;
pub use database::Record;
Loading

0 comments on commit 162dc2e

Please sign in to comment.