From f61c71900361783b697c3002ca8306614b493b11 Mon Sep 17 00:00:00 2001 From: Chris Douglas Date: Tue, 26 Sep 2023 13:16:40 -0700 Subject: [PATCH] No idea what I was doing, here --- basen/lattice.md | 147 ++++++++++++++++++++++++++++++++++++++++++ basen/src/latless.rs | 126 ++++++++++++++++++++---------------- basen/src/protocol.rs | 9 +++ 3 files changed, 227 insertions(+), 55 deletions(-) create mode 100644 basen/lattice.md diff --git a/basen/lattice.md b/basen/lattice.md new file mode 100644 index 000000000..3a4c9580f --- /dev/null +++ b/basen/lattice.md @@ -0,0 +1,147 @@ +```mermaid +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1[\"(1v1) source_stream_serde(cl_inbound)"/]:::pullClass + 2v1[\"(2v1) map(Result::unwrap)"/]:::pullClass + 4v1[/"
(4v1)
demux(|(cl_req, addr), var_args!(info, errs)| match cl_req {
CKRequest::Info { id, key, .. } => info.give((key, (id, addr))),
_ => errs.give((cl_req, addr)),
})
"\]:::pushClass + 5v1[/"
(5v1)
for_each(|(msg, addr)| {
println!("KN: Unexpected CL message type: {:?} from {:?}", msg, addr)
})
"\]:::pushClass + 1v1--->2v1 + 2v1--->4v1 + 4v1--errs--->5v1 + subgraph sg_1v1_var_client_demux ["var client_demux"] + 1v1 + 2v1 + 4v1 + end +end +subgraph sg_2v1 ["sg_2v1 stratum 0"] + 24v1[\"
(24v1)
map(|(id, svc_addr, _, _, last_contact)| (
id,
DomPair::<
Max<DateTime<Utc>>,
Point<SocketAddr, ()>,
>::new_from(last_contact, Point::new_from(svc_addr)),
))
"/]:::pullClass + 27v1[\"
(27v1)
flat_map(|
(
id,
_,
blocks,
_,
_,
): (SegmentNodeID, SocketAddr, Vec<Block>, _, Max<DateTime<Utc>>)|
{
blocks
.into_iter()
.map(move |block| (block, SetUnionHashSet::new_from([id.clone()])))
})
"/]:::pullClass + 31v1[\"(31v1) source_iter(init_keys)"/]:::pullClass + 28v1[\"(28v1) join::<'tick, 'static>()"/]:::pullClass + 29v1[\"
(29v1)
flat_map(|(key, (cli, blocks))| {
blocks
.into_iter()
.map(move |block| (
block,
MapUnionHashMap::new_from([
(key.clone(), SetUnionHashSet::new_from([cli.clone()])),
]),
))
})
"/]:::pullClass + 30v1[\"(30v1) inspect(|x| println!("{}: KN: LOOKUP_KEY_MAP: {x:?}", Utc::now()))"/]:::pullClass + 25v1[\"
(25v1)
lattice_join::<
'tick,
'static,
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
SetUnionHashSet<SegmentNodeID>,
>()
"/]:::pullClass + 26v1[\"
(26v1)
flat_map(|(block, (clikeyset, sn_set))| {
sn_set
.into_reveal()
.into_iter()
.map(move |sn| (
sn,
MapUnionHashMap::new_from([(block.clone(), clikeyset.clone())]),
))
})
"/]:::pullClass + 12v1[\"
(12v1)
lattice_join::<
'tick,
'static,
MapUnionHashMap<
Block,
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
>,
DomPair<Max<DateTime<Utc>>, Point<SocketAddr, ()>>,
>()
"/]:::pullClass + 13v1[\"
(13v1)
inspect(|
x: &(
SegmentNodeID,
(
MapUnionHashMap<
Block,
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
>,
DomPair<Max<DateTime<Utc>>, Point<SocketAddr, ()>>,
),
)|
println!("{}: <- LCM: {x:?}", Utc::now()))
"/]:::pullClass + 14v1[\"
(14v1)
filter(|(_, (_, hbts))| {
Utc::now() - *hbts.as_reveal_ref().0.as_reveal_ref()
< chrono::Duration::seconds(10)
})
"/]:::pullClass + 15v1[\"
(15v1)
flat_map(|(_, (block_clikey, hbts))| {
block_clikey
.into_reveal()
.into_iter()
.map(move |(block, clikeys)| MapUnionHashMap::new_from([
(
block,
Pair::<
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
SetUnionHashSet<SocketAddr>,
>::new_from(
clikeys,
SetUnionHashSet::new_from([hbts.into_reveal().1.val]),
),
),
]))
})
"/]:::pullClass + 24v1--1--->12v1 + 27v1--1--->25v1 + 31v1--1--->28v1 + 28v1--->29v1 + 29v1--->30v1 + 30v1--0--->25v1 + 25v1--->26v1 + 26v1--0--->12v1 + 12v1--->13v1 + 13v1--->14v1 + 14v1--->15v1 + subgraph sg_2v1_var_block_map ["var block_map"] + 25v1 + 26v1 + end + subgraph sg_2v1_var_key_map ["var key_map"] + 28v1 + 29v1 + 30v1 + end + subgraph sg_2v1_var_last_contact_map ["var last_contact_map"] + 12v1 + 13v1 + 14v1 + 15v1 + end +end +subgraph sg_3v1 ["sg_3v1 stratum 1"] + 16v1[\"
(16v1)
lattice_fold::<
'tick,
MapUnionHashMap<
Block,
Pair<
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
SetUnionHashSet<SocketAddr>,
>,
>,
>()
"/]:::pullClass + subgraph sg_3v1_var_last_contact_map ["var last_contact_map"] + 16v1 + end +end +subgraph sg_4v1 ["sg_4v1 stratum 1"] + 17v1[\"
(17v1)
flat_map(|block_keycli_addr| {
block_keycli_addr
.into_reveal()
.into_iter()
.map(move |(block, req_sn_addrs)| (block, req_sn_addrs.into_reveal()))
})
"/]:::pullClass + subgraph sg_4v1_var_last_contact_map ["var last_contact_map"] + 17v1 + end +end +subgraph sg_5v1 ["sg_5v1 stratum 1"] + 18v1[\"
(18v1)
flat_map(move |
(
block,
(reqs, sn_addrs),
): (
Block,
(
MapUnionHashMap<String, SetUnionHashSet<(ClientID, SocketAddr)>>,
SetUnionHashSet<SocketAddr>,
),
)|
{
reqs
.into_reveal()
.into_iter()
.map(move |(key, cliset)| (
key,
LocatedBlock {
block: block.clone(),
locations: sn_addrs
.clone()
.into_reveal()
.into_iter()
.collect::<Vec<_>>(),
},
cliset,
))
})
"/]:::pullClass + subgraph sg_5v1_var_last_contact_map ["var last_contact_map"] + 18v1 + end +end +subgraph sg_6v1 ["sg_6v1 stratum 1"] + 19v1[\"
(19v1)
flat_map(move |(key, lblock, cliset)| {
cliset
.into_reveal()
.into_iter()
.map(move |(id, addr)| ((id, addr, key.clone()), lblock.clone()))
})
"/]:::pullClass + subgraph sg_6v1_var_last_contact_map ["var last_contact_map"] + 19v1 + end +end +subgraph sg_7v1 ["sg_7v1 stratum 2"] + 20v1[\"
(20v1)
fold_keyed::<
'tick,
>(
Vec::new,
|lblocks: &mut Vec<LocatedBlock>, lblock| {
lblocks.push(lblock);
},
)
"/]:::pullClass + 21v1[\"
(21v1)
map(|((_, addr, key), lblocks)| (
CKResponse::Info {
key: key,
blocks: lblocks,
},
addr,
))
"/]:::pullClass + 22v1[\"(22v1) inspect(|x| println!("{}: -> LCM: {x:?}", Utc::now()))"/]:::pullClass + 23v1[/"(23v1) dest_sink_serde(cl_outbound)"\]:::pushClass + 20v1--->21v1 + 21v1--->22v1 + 22v1--->23v1 + subgraph sg_7v1_var_last_contact_map ["var last_contact_map"] + 20v1 + 21v1 + 22v1 + 23v1 + end +end +subgraph sg_8v1 ["sg_8v1 stratum 0"] + 6v1[\"(6v1) source_stream_serde(sn_inbound)"/]:::pullClass + 7v1[\"(7v1) map(|m| m.unwrap())"/]:::pullClass + 8v1[/"
(8v1)
demux(|(sn_req, addr), var_args!(heartbeat, errs)| match sn_req {
SKRequest::Heartbeat { id, svc_addr, blocks } => {
heartbeat.give((id, svc_addr, blocks, addr, Max::new(Utc::now())))
}
_ => errs.give((sn_req, addr)),
})
"\]:::pushClass + 9v1[/"(9v1) tee()"\]:::pushClass + 10v1[/"(10v1) map(|(_, _, _, addr, _)| (SKResponse::Heartbeat {}, addr))"\]:::pushClass + 11v1[/"(11v1) dest_sink_serde(sn_outbound)"\]:::pushClass + 32v1[/"
(32v1)
for_each(|(msg, addr)| {
println!("KN: Unexpected SN message type: {:?} from {:?}", msg, addr)
})
"\]:::pushClass + 6v1--->7v1 + 7v1--->8v1 + 8v1--heartbeat--->9v1 + 8v1--errs--->32v1 + 9v1--->10v1 + 10v1--->11v1 + subgraph sg_8v1_var_heartbeats ["var heartbeats"] + 9v1 + end + subgraph sg_8v1_var_segnode_demux ["var segnode_demux"] + 6v1 + 7v1 + 8v1 + end +end +4v1--info--->33v1 +9v1--->39v1 +9v1--->40v1 +15v1--->38v1 +16v1--->37v1 +17v1--->36v1 +18v1--->35v1 +19v1--->34v1 +33v1["(33v1) handoff"]:::otherClass +33v1--0--->28v1 +34v1["(34v1) handoff"]:::otherClass +34v1===o20v1 +35v1["(35v1) handoff"]:::otherClass +35v1--->19v1 +36v1["(36v1) handoff"]:::otherClass +36v1--->18v1 +37v1["(37v1) handoff"]:::otherClass +37v1--->17v1 +38v1["(38v1) handoff"]:::otherClass +38v1===o16v1 +39v1["(39v1) handoff"]:::otherClass +39v1--->24v1 +40v1["(40v1) handoff"]:::otherClass +40v1--->27v1 +``` diff --git a/basen/src/latless.rs b/basen/src/latless.rs index 0505f7043..16c80a172 100644 --- a/basen/src/latless.rs +++ b/basen/src/latless.rs @@ -1,9 +1,6 @@ use std::net::SocketAddr; use chrono::{DateTime, Utc}; -use hydroflow::lattices::map_union::MapUnionHashMap; -use hydroflow::lattices::set_union::SetUnionHashSet; -use hydroflow::lattices::{DomPair, Max, Pair, Point}; use hydroflow::util::{bind_tcp_bytes, connect_tcp_bytes, ipv4_resolve}; use hydroflow::{hydroflow_syntax, tokio}; use tokio::time; @@ -74,7 +71,7 @@ pub async fn key_node(opts: &Opts, keynode_sn_addr: &'static str, keynode_client -> demux(|(sn_req, addr), var_args!(heartbeat, errs)| match sn_req { //SKRequest::Register {id, ..} => register.give((key, addr)), - SKRequest::Heartbeat { id, svc_addr, blocks, } => heartbeat.give((id, svc_addr, blocks, addr, Max::new(Utc::now()))), + SKRequest::Heartbeat { id, svc_addr, blocks, } => heartbeat.give((id, svc_addr, blocks, addr, Utc::now())), //_ => errs.give((sn_req, addr)), _ => errs.give((sn_req, addr)), } @@ -88,72 +85,91 @@ pub async fn key_node(opts: &Opts, keynode_sn_addr: &'static str, keynode_client -> map(|(_, _, _, addr, _)| (SKResponse::Heartbeat { }, addr)) -> dest_sink_serde(sn_outbound); - last_contact_map = lattice_join::<'tick, 'static, - MapUnionHashMap>>, - DomPair>, Point>>() - -> inspect(|x: &(SegmentNodeID, - (MapUnionHashMap>>, - DomPair>, Point>))| - println!("{}: <- LCM: {x:?}", Utc::now())) - // XXX want this filter to yield... a MIN/BOT value s.t. if no valid replicas, will merge into an error value? - // remove segment nodes that haven't sent a heartbeat in the last 10 seconds - -> filter(|(_, (_, hbts))| Utc::now() - *hbts.as_reveal_ref().0.as_reveal_ref() < chrono::Duration::seconds(10)) - // extract the service address from those nodes - -> flat_map(|(_, (block_clikey, hbts))| block_clikey.into_reveal().into_iter().map(move - |(block, clikeys)| MapUnionHashMap::new_from([(block, - Pair::>, SetUnionHashSet> - ::new_from(clikeys, SetUnionHashSet::new_from([hbts.into_reveal().1.val])))]))) - -> lattice_fold::<'tick, - MapUnionHashMap>, - SetUnionHashSet>>>() - // unpack map into tuples - -> flat_map(|block_keycli_addr| block_keycli_addr.into_reveal().into_iter().map(move - |(block, req_sn_addrs)| (block, req_sn_addrs.into_reveal()))) - -> flat_map(move |(block, (reqs, sn_addrs)): (Block, (MapUnionHashMap>, - SetUnionHashSet))| - reqs.into_reveal().into_iter().map(move |(key, cliset)| - (key, - LocatedBlock { - block: block.clone(), - locations: sn_addrs.clone().into_reveal().into_iter().collect::>(), // XXX clone() avoidable? - }, - cliset))) - -> flat_map(move |(key, lblock, cliset)| cliset.into_reveal().into_iter().map(move |(id, addr)| ((id, addr, key.clone()), lblock.clone()))) - -> fold_keyed::<'tick>(Vec::new, |lblocks: &mut Vec, lblock| { - lblocks.push(lblock); - }) - -> map(|((_, addr, key), lblocks)| (CKResponse::Info { key: key, blocks: lblocks }, addr)) - -> inspect(|x| println!("{}: -> LCM: {x:?}", Utc::now())) - -> dest_sink_serde(cl_outbound); + last_contact_map = join::<'tick, 'static>() + -> inspect(|x: &(SegmentNodeID, ((Block, String, ClientInfo), (DateTime, SocketAddr)))| println!("{}: KN: LCM: {x:?} -> ", Utc::now())) + -> filter(|(_, (_, (last_contact, _)))| Utc::now() - *last_contact < chrono::Duration::seconds(10)) + // XXX need to clone the current value in reduce_keyed; can this select the max by reference? + -> reduce_keyed( + |cur: &mut ((Block, String, ClientInfo), (DateTime, SocketAddr)), + val: ((Block, String, ClientInfo), (DateTime, SocketAddr))| + *cur = std::cmp::max_by(cur.clone(), val, |c, v| c.1.0.cmp(&v.1.0))) + // remove unnecessary duplication + -> map(|(_, ((block, key, cli), (_, addr)))| ((key, cli, block.clone()), LocatedBlock { block: block.clone(), locations: vec![addr] })) + -> reduce_keyed(|cur: &mut LocatedBlock, val: LocatedBlock| cur.locations.extend(val.locations)) + -> inspect(|x: &((String, ClientInfo, Block), LocatedBlock)| println!("{}: KN: LCM: {x:?} ******* ", Utc::now())) + -> null(); + + //last_contact_map = lattice_join::<'tick, 'static, + // MapUnionHashMap>>, + // DomPair>, Point>>() + // -> inspect(|x: &(SegmentNodeID, + // (MapUnionHashMap>>, + // DomPair>, Point>))| + // println!("{}: <- LCM: {x:?}", Utc::now())) + // // XXX want this filter to yield... a MIN/BOT value s.t. if no valid replicas, will merge into an error value? + // // remove segment nodes that haven't sent a heartbeat in the last 10 seconds + // -> filter(|(_, (_, hbts))| Utc::now() - *hbts.as_reveal_ref().0.as_reveal_ref() < chrono::Duration::seconds(10)) + // // extract the service address from those nodes + // -> flat_map(|(_, (block_clikey, hbts))| block_clikey.into_reveal().into_iter().map(move + // |(block, clikeys)| MapUnionHashMap::new_from([(block, + // Pair::>, SetUnionHashSet> + // ::new_from(clikeys, SetUnionHashSet::new_from([hbts.into_reveal().1.val])))]))) + // -> lattice_fold::<'tick, + // MapUnionHashMap>, + // SetUnionHashSet>>>() + // // unpack map into tuples + // -> flat_map(|block_keycli_addr| block_keycli_addr.into_reveal().into_iter().map(move + // |(block, req_sn_addrs)| (block, req_sn_addrs.into_reveal()))) + // -> flat_map(move |(block, (reqs, sn_addrs)): (Block, (MapUnionHashMap>, + // SetUnionHashSet))| + // reqs.into_reveal().into_iter().map(move |(key, cliset)| + // (key, + // LocatedBlock { + // block: block.clone(), + // locations: sn_addrs.clone().into_reveal().into_iter().collect::>(), // XXX clone() avoidable? + // }, + // cliset))) + // -> flat_map(move |(key, lblock, cliset)| cliset.into_reveal().into_iter().map(move |(id, addr)| ((id, addr, key.clone()), lblock.clone()))) + // -> fold_keyed::<'tick>(Vec::new, |lblocks: &mut Vec, lblock| { + // lblocks.push(lblock); + // }) + // -> map(|((_, addr, key), lblocks)| (CKResponse::Info { key: key, blocks: lblocks }, addr)) + // -> inspect(|x| println!("{}: -> LCM: {x:?}", Utc::now())) + // -> dest_sink_serde(cl_outbound); heartbeats - -> map(|(id, svc_addr, _, _, last_contact)| (id, DomPair::>,Point>::new_from(last_contact, Point::new_from(svc_addr)))) - //-> inspect(|x| println!("{}: KN: HB_LC: {x:?}", Utc::now())) + -> map(|(id, svc_addr, _, _, last_contact)| (id, (last_contact, svc_addr))) + -> inspect(|x: &(SegmentNodeID, (DateTime, SocketAddr))| println!("{}: KN: HB_LC: {x:?}", Utc::now())) -> [1]last_contact_map; - // join all requests for blocks - block_map = lattice_join::<'tick, 'static, MapUnionHashMap>, SetUnionHashSet>() - // can we replace `clone()` with `to_owned()`? The compiler thinks so! - -> flat_map(|(block, (clikeyset, sn_set))| sn_set.into_reveal().into_iter().map(move |sn| - (sn, MapUnionHashMap::new_from([(block.clone(), clikeyset.clone())])))) - //-> inspect(|x: &(SegmentNodeID, MapUnionHashMap>>)| println!("{}: KN: RPC_LC: {x:?}", Utc::now())) + //// join all requests for blocks + //block_map = lattice_join::<'tick, 'static, MapUnionHashMap>, SetUnionHashSet>() + // // can we replace `clone()` with `to_owned()`? The compiler thinks so! + // -> flat_map(|(block, (clikeyset, sn_set))| sn_set.into_reveal().into_iter().map(move |sn| + // (sn, MapUnionHashMap::new_from([(block.clone(), clikeyset.clone())])))) + // //-> inspect(|x: &(SegmentNodeID, MapUnionHashMap>>)| println!("{}: KN: RPC_LC: {x:?}", Utc::now())) + // -> [0]last_contact_map; + + block_map = join::<'tick, 'static>() + -> inspect(|x: &(Block, ((String, ClientInfo), SegmentNodeID))| println!("{}: KN: LOOKUP_BLOCK_MAP: {x:?}", Utc::now())) + -> map(|(block, ((key, cli), sn_id))| (sn_id, (block, key, cli))) -> [0]last_contact_map; heartbeats - -> flat_map(|(id, _, blocks, _, _): (SegmentNodeID, SocketAddr, Vec, _, Max>)| blocks.into_iter().map(move |block| (block, SetUnionHashSet::new_from([id.clone()])))) + -> flat_map(|(id, _, blocks, _, _): (SegmentNodeID, _, Vec, _, _)| blocks.into_iter().map(move |block| (block, id.clone()))) + // add filter for blocks here? -> [1]block_map; // join (key, client) requests with existing key map (key, blocks) key_map = join::<'tick, 'static>() - -> flat_map(|(key, (cli, blocks))| blocks.into_iter().map(move - |block| (block, MapUnionHashMap::new_from([(key.clone(), SetUnionHashSet::new_from([cli.clone()]))])))) - -> inspect(|x| println!("{}: KN: LOOKUP_KEY_MAP: {x:?}", Utc::now())) + -> inspect(|x: &(String, ((ClientID, SocketAddr), Block))| println!("{}: KN: LOOKUP_KEY_MAP: {x:?}", Utc::now())) + -> map(|(key, ((id, addr), block))| (block, (key, ClientInfo { id, addr }))) -> [0]block_map; // flatten into relation source_iter(init_keys) - //-> flat_map(|(key, blocks)| blocks.into_iter().map(move |block| (key, block))) + -> flat_map(|(key, blocks)| blocks.into_iter().map(move |block| (key.clone(), block))) -> [1]key_map; segnode_demux[errs] diff --git a/basen/src/protocol.rs b/basen/src/protocol.rs index 93c51a629..30edc81ea 100644 --- a/basen/src/protocol.rs +++ b/basen/src/protocol.rs @@ -52,6 +52,15 @@ pub struct ClientID { // add metadata for KN to determine location for replica ordering } +// internal, clean up some nested tuples by making this a join later +#[derive(Eq, Hash, PartialEq, Clone, Serialize, Deserialize, Debug)] +#[rustfmt::skip] +pub struct ClientInfo { + pub id: ClientID, + pub addr: SocketAddr, +} + + #[derive(Eq, Hash, PartialEq, Clone, Serialize, Deserialize, Debug, Default)] #[rustfmt::skip] pub struct SegmentNodeID {