diff --git a/move/sources/api.move b/move/sources/api.move index c800f12..c977035 100644 --- a/move/sources/api.move +++ b/move/sources/api.move @@ -74,20 +74,20 @@ module dtp::api { // Transmit a request toward the server. // // The encoding of the 'data' depends on the service. - public fun send_request(ipipe: &mut InnerPipe, data: vector, args: vector, ctx: &mut TxContext): vector + public fun send_request(ipipe: &mut InnerPipe, data: vector, cid: u64, args: vector, ctx: &mut TxContext): vector { let kvargs = kvalues::from_bytes(&args); - let ret_value = dtp::api_impl::send_request(ipipe, data, &kvargs, ctx); + let ret_value = dtp::api_impl::send_request(ipipe, data, cid, &kvargs, ctx); kvalues::to_bytes(&ret_value) } // Transmit a response toward the client. // // The encoding of the 'data' depends on the service. - public fun send_response( ipipe: &mut InnerPipe, seq_num: u64, data: vector, args: vector, ctx: &mut TxContext): vector + public fun send_response( ipipe: &mut InnerPipe, req_ipipe_idx: u8, req_seq_num: u64, data: vector, cid: u64, args: vector, ctx: &mut TxContext): vector { let kvargs = kvalues::from_bytes(&args); - let ret_value = dtp::api_impl::send_response(ipipe, seq_num, data, &kvargs, ctx); + let ret_value = dtp::api_impl::send_response(ipipe, req_ipipe_idx, req_seq_num, data, cid, &kvargs, ctx); kvalues::to_bytes(&ret_value) } diff --git a/move/sources/api_impl.move b/move/sources/api_impl.move index 0ddd6f3..5bea07e 100644 --- a/move/sources/api_impl.move +++ b/move/sources/api_impl.move @@ -97,15 +97,20 @@ module dtp::api_impl { // Transmit a request toward the server. // // The encoding of the 'data' depends on the service. - public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector, _kvargs: &KValues, _ctx: &mut TxContext): KValues + public(friend) fun send_request(ipipe: &mut InnerPipe, data: vector, cid: u64, _kvargs: &KValues, _ctx: &mut TxContext): KValues { - let seq_num = inner_pipe::inc_seq_num(ipipe); + let req_seq_num = inner_pipe::inc_seq_num(ipipe); + let req_ipipe_idx = inner_pipe::get_ipipe_idx(ipipe); // Emit a request event. + let cli_host_ref = inner_pipe::get_cli_host_ref(ipipe); + let srv_host_ref = inner_pipe::get_srv_host_ref(ipipe); let ipipe_ref = weak_ref::new_from_obj(ipipe); + let peer_ipipe_ref = inner_pipe::get_peer_ref(ipipe); let tc_ref = inner_pipe::get_tc_ref(ipipe); - let service_idx = inner_pipe::get_service_idx(ipipe); - events::emit_request(service_idx, seq_num, tc_ref, ipipe_ref, data); + let service_idx = inner_pipe::get_service_idx(ipipe); + + events::emit_request(service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, ipipe_ref, peer_ipipe_ref, data, cid ); // Update stats for debugging. inner_pipe::inc_emit_cnt(ipipe); @@ -116,13 +121,17 @@ module dtp::api_impl { // Transmit a response toward the client. // // The encoding of the 'data' depends on the service. - public(friend) fun send_response(ipipe: &mut InnerPipe, seq_num: u64, data: vector, _kvargs: &KValues, _ctx: &mut TxContext): KValues + public(friend) fun send_response(ipipe: &mut InnerPipe, req_ipipe_idx: u8, req_seq_num: u64, data: vector, cid: u64, _kvargs: &KValues, _ctx: &mut TxContext): KValues { // Emit a response event. + let cli_host_ref = inner_pipe::get_cli_host_ref(ipipe); + let srv_host_ref = inner_pipe::get_srv_host_ref(ipipe); let ipipe_ref = weak_ref::new_from_obj(ipipe); + let peer_ipipe_ref = inner_pipe::get_peer_ref(ipipe); let tc_ref = inner_pipe::get_tc_ref(ipipe); let service_idx = inner_pipe::get_service_idx(ipipe); - events::emit_response(service_idx, seq_num, tc_ref, ipipe_ref, data); + + events::emit_response(service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, ipipe_ref, peer_ipipe_ref, data, cid); // Update stats for debugging. inner_pipe::inc_emit_cnt(ipipe); diff --git a/move/sources/events.move b/move/sources/events.move index 939c422..1edd658 100644 --- a/move/sources/events.move +++ b/move/sources/events.move @@ -30,11 +30,16 @@ module dtp::events { struct Datagram has copy, drop { flags: u8, // Reserve for future. src: u8, // 0x01 or 0x02 for respectively cli_tx_ipipe and srv_tx_ipipe. - src_addr: address, // InnerPipe Address + src_addr: address, // InnerPipe address emitting the event. service_idx: u8, // Service Type [1..253] - seq_num: u64, - tc_ref: WeakRef, // TransportControl Address. + req_ipipe_idx: u8, // Uniquely identifies the originating request ipipe with an index [0..n_req_pipe-1] + req_seq_num: u64, // Sequence number assigned by the originating ipipe. + cli_host_ref: WeakRef, // Client Host Address (Optimization to minimize lookup at receiver). + srv_host_ref: WeakRef, // Server Host Address (Optimization to minimize lookup at receiver). + tc_ref: WeakRef, // TransportControl Address + peer_ipipe_ref: WeakRef, // InnerPipe in the other direction. data: vector, // The endpoint response/request (e.g. JSON-RPC). + cid: u64, // Correlation ID from the originating request endpoint. } // === Public-Mutative Functions === @@ -48,19 +53,19 @@ module dtp::events { event::emit(ConnReq { flags: 0, src: 0x03, src_addr, service_idx, conn }); } - public(friend) fun emit_response( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector ) { + public(friend) fun emit_response( service_idx: u8, req_ipipe_idx: u8, req_seq_num: u64, cli_host_ref: WeakRef, srv_host_ref: WeakRef, tc_ref: WeakRef, ipipe_ref: WeakRef, peer_ipipe_ref: WeakRef, data: vector, cid: u64 ) { let src_addr = dtp::weak_ref::get_address(&ipipe_ref); - event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, seq_num, tc_ref, data }); + event::emit(Datagram { flags: 0, src: 0x02, src_addr, service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, peer_ipipe_ref, data, cid }); } - public(friend) fun emit_request( service_idx: u8, seq_num: u64, tc_ref: WeakRef, ipipe_ref: WeakRef, data: vector ) { + public(friend) fun emit_request( service_idx: u8, req_ipipe_idx: u8, req_seq_num: u64, cli_host_ref: WeakRef, srv_host_ref: WeakRef, tc_ref: WeakRef, ipipe_ref: WeakRef, peer_ipipe_ref: WeakRef, data: vector, cid: u64 ) { let src_addr = dtp::weak_ref::get_address(&ipipe_ref); - event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, seq_num, tc_ref, data }); + event::emit(Datagram { flags: 0, src: 0x01, src_addr, service_idx, req_ipipe_idx, req_seq_num, cli_host_ref, srv_host_ref, tc_ref, peer_ipipe_ref, data, cid }); } // === Private Functions === // === Test Functions === - + } \ No newline at end of file diff --git a/move/sources/inner_pipe.move b/move/sources/inner_pipe.move index 698fec7..d8b82c2 100644 --- a/move/sources/inner_pipe.move +++ b/move/sources/inner_pipe.move @@ -32,9 +32,13 @@ module dtp::inner_pipe { struct InnerPipe has key, store { id: UID, flgs: u8, // DTP version+esc flags always after UID. - service_idx: u8, + ipipe_idx: u8, + service_idx: u8, + cli_host_ref: WeakRef, + srv_host_ref: WeakRef, tc_ref: WeakRef, pipe_ref: WeakRef, + peer_pipe_ref: WeakRef, sync_data: PipeSyncData, seq_num: u64, // Stats to help debugging. @@ -50,13 +54,17 @@ module dtp::inner_pipe { // === Public-Friend Functions === - public(friend) fun new( service_idx: u8, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe { + public(friend) fun new( ipipe_idx: u8, service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, pipe_addr: address, ctx: &mut TxContext ): InnerPipe { let new_obj = InnerPipe { id: object::new(ctx), flgs: 0u8, + ipipe_idx, service_idx, + cli_host_ref: weak_ref::new(cli_id), + srv_host_ref: weak_ref::new(srv_id), tc_ref: weak_ref::new(tc_id), pipe_ref: weak_ref::new_from_address(pipe_addr), + peer_pipe_ref: weak_ref::new_empty(), sync_data: pipe_sync_data::new(), seq_num: 1, emit_cnt:0, @@ -65,17 +73,20 @@ module dtp::inner_pipe { new_obj } - public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address + public(friend) fun new_transfered( ipipe_idx: u8, service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, pipe_addr: address, recipient: address, ctx: &mut TxContext ): address { - let new_obj = new(service_idx, tc_id, pipe_addr, ctx); + let new_obj = new( ipipe_idx, service_idx, cli_id, srv_id, tc_id, pipe_addr, ctx); let new_obj_addr = uid_to_address(&new_obj.id); transfer::transfer(new_obj, recipient); new_obj_addr } public(friend) fun delete( self: InnerPipe ) { - let InnerPipe { id, flgs: _, service_idx: _, + let InnerPipe { id, flgs: _, + ipipe_idx: _, service_idx: _, + cli_host_ref: _, srv_host_ref: _, tc_ref: _, pipe_ref: _, + peer_pipe_ref: _, sync_data: _, seq_num: _ , emit_cnt: _, sync_cnt: _ @@ -103,14 +114,43 @@ module dtp::inner_pipe { weak_ref::get_address(&self.tc_ref ) } - public (friend) fun get_tc_ref( self: &InnerPipe ): WeakRef { + public(friend) fun get_cli_host_ref( self: &InnerPipe ): WeakRef { + self.cli_host_ref + } + + public(friend) fun get_srv_host_ref( self: &InnerPipe ): WeakRef { + self.srv_host_ref + } + + public(friend) fun get_tc_ref( self: &InnerPipe ): WeakRef { self.tc_ref } - public (friend) fun get_service_idx( self: &InnerPipe ): u8 { + public(friend) fun get_peer_ref( self: &InnerPipe ): WeakRef { + self.peer_pipe_ref + } + + public(friend) fun get_service_idx( self: &InnerPipe ): u8 { self.service_idx } + public(friend) fun get_ipipe_idx( self: &InnerPipe ): u8 { + self.ipipe_idx + } + + public(friend) fun get_ipipe_address( self: &InnerPipe ): address { + uid_to_address(&self.id) + } + + public(friend) fun set_peer_ref( self: &mut InnerPipe, peer_pipe: &InnerPipe ) { + weak_ref::set( &mut self.peer_pipe_ref, object::borrow_id(peer_pipe)); + } + + public(friend) fun transfer( self: InnerPipe, recipient: address ) { + transfer::transfer(self, recipient); + } + + // === Private Functions === // === Test Functions === diff --git a/move/sources/pipe.move b/move/sources/pipe.move index 365e1a5..dcdf349 100644 --- a/move/sources/pipe.move +++ b/move/sources/pipe.move @@ -36,6 +36,8 @@ module dtp::pipe { id: UID, flgs: u8, // DTP version+esc flags always after UID. service_idx: u8, + cli_host_ref: WeakRef, + srv_host_ref: WeakRef, tc_ref: WeakRef, // TransportControl ipipe_refs: vector, // InnerPipe(s) sync_data: PipeSyncData, // Merged of all InnerPipe sync_data. @@ -49,67 +51,69 @@ module dtp::pipe { // === Public-Friend Functions === - public(friend) fun new_transfered( service_idx: u8, tc_id: &ID, ipipe_count: u8, recipient: address, is_cli_tx_pipe: bool, conn: &mut ConnObjects, ctx: &mut TxContext ): address { + // Create two pipes at once + public(friend) fun new_pipes( service_idx: u8, cli_id: &ID, srv_id: &ID, tc_id: &ID, ipipe_count: u8, cli_recipient: address, srv_recipient: address, conn: &mut ConnObjects, ctx: &mut TxContext ): (Pipe,Pipe) { assert!(ipipe_count > 0, errors::EInvalidPipeCount()); - let new_pipe = Pipe { + let cli_pipe = Pipe { id: object::new(ctx), flgs: 0, service_idx, + cli_host_ref: weak_ref::new(cli_id), + srv_host_ref: weak_ref::new(srv_id), tc_ref: weak_ref::new(tc_id), ipipe_refs: vector::empty(), sync_data: pipe_sync_data::new(), }; - let pipe_addr = uid_to_address(&new_pipe.id); - if (is_cli_tx_pipe) { - conn_objects::set_cli_tx_pipe(conn, pipe_addr); - } else { - conn_objects::set_srv_tx_pipe(conn, pipe_addr); + let srv_pipe = Pipe { + id: object::new(ctx), + flgs: 0, + service_idx, + cli_host_ref: weak_ref::new(cli_id), + srv_host_ref: weak_ref::new(srv_id), + tc_ref: weak_ref::new(tc_id), + ipipe_refs: vector::empty(), + sync_data: pipe_sync_data::new(), }; - + + let cli_pipe_addr = uid_to_address(&cli_pipe.id); + conn_objects::set_cli_tx_pipe(conn, cli_pipe_addr); + + let srv_pipe_addr = uid_to_address(&srv_pipe.id); + conn_objects::set_srv_tx_pipe(conn, srv_pipe_addr); + // Create InnerPipes. - let i: u8 = 0; - while (i < ipipe_count) { - let ipipe_addr = inner_pipe::new_transfered(service_idx, tc_id, pipe_addr, recipient, ctx); + let ipipe_idx: u8 = 0; + while (ipipe_idx < ipipe_count) { + let cli_ipipe= inner_pipe::new(ipipe_idx, service_idx, cli_id, srv_id, tc_id, cli_pipe_addr, ctx); + let srv_ipipe= inner_pipe::new(ipipe_idx, service_idx, cli_id, srv_id, tc_id, srv_pipe_addr, ctx); + + // Cross-reference the inner pipes. + inner_pipe::set_peer_ref(&mut cli_ipipe, &srv_ipipe); + inner_pipe::set_peer_ref(&mut srv_ipipe, &cli_ipipe); // Save WeakRef in the Pipe object (for slow discovery), and the addresses in // the ConnObjects (to be return/emitted to the end-points). - if (is_cli_tx_pipe) { - conn_objects::add_cli_tx_ipipe(conn, ipipe_addr); - } else { - conn_objects::add_srv_tx_ipipe(conn, ipipe_addr); - }; - let ipipe_ref = weak_ref::new_from_address(ipipe_addr); - vector::push_back(&mut new_pipe.ipipe_refs, ipipe_ref); - i = i + 1; + let cli_ipipe_addr = inner_pipe::get_ipipe_address(&cli_ipipe); + let srv_ipipe_addr = inner_pipe::get_ipipe_address(&srv_ipipe); + + conn_objects::add_cli_tx_ipipe(conn, cli_ipipe_addr); + conn_objects::add_srv_tx_ipipe(conn, srv_ipipe_addr); + + let cli_ipipe_ref = weak_ref::new_from_address(cli_ipipe_addr); + let srv_ipipe_ref = weak_ref::new_from_address(srv_ipipe_addr); + vector::push_back(&mut cli_pipe.ipipe_refs, cli_ipipe_ref); + vector::push_back(&mut srv_pipe.ipipe_refs, srv_ipipe_ref); + inner_pipe::transfer(cli_ipipe, cli_recipient); + inner_pipe::transfer(srv_ipipe, srv_recipient); + ipipe_idx = ipipe_idx + 1; }; - transfer::transfer(new_pipe, recipient); - - pipe_addr + (cli_pipe, srv_pipe) } -/* TODO - public(friend) fun delete( self: Pipe, ipipes: vector ) { - let Pipe { id, flgs: _, sync_data: _, tc: _, ipipes } = self; - // Delete all the inner pipes. - // For tracking/debugging purpose, the weak ref is not removed from vector (only cleared). - let i: u64 = 0; - while (i < vector::length(&ipipes)) { - let inner_pipe_ref = vector::borrow_mut(&mut ipipes, i); - let inner_pipe_id = weak_ref::id(inner_pipe_ref); - weak_ref::clear(inner_pipe_ref); - object::delete(inner_pipe_id); - inner_pipe::delete(inner_pipe_id); - i = i + 1; - }; - object::delete(id); + public(friend) fun transfer( self: Pipe, recipient: address ) { + transfer::transfer(self, recipient); } -*/ - - // === Private Functions === - - // === Test Functions === - -} +} \ No newline at end of file diff --git a/move/sources/transport_control.move b/move/sources/transport_control.move index f3effb6..5d9de41 100644 --- a/move/sources/transport_control.move +++ b/move/sources/transport_control.move @@ -21,8 +21,6 @@ module dtp::transport_control { // ================ Tests ===================== // Unit Tests - use std::option::{Option}; - use sui::object::{Self, UID}; use sui::tx_context::{TxContext}; @@ -32,6 +30,7 @@ module dtp::transport_control { use dtp::errors::{Self}; use dtp::weak_ref::{Self,WeakRef}; use dtp::host::{Self,Host}; + use dtp::pipe::{Self,Pipe}; use dtp::conn_objects::{Self,ConnObjects}; #[test_only] @@ -121,14 +120,18 @@ module dtp::transport_control { srv_tx_pipe: weak_ref::new_empty() }; + + let cli_id = object::borrow_id(cli_host); + let srv_id = object::borrow_id(srv_host); // Initialize the Weak references (for slow discovery). - let cli_tx_pipe_addr = dtp::pipe::new_transfered(service_idx, + let (cli_tx_pipe, srv_tx_pipe) = dtp::pipe::new_pipes(service_idx, cli_id, srv_id, object::borrow_id(&tc), - 2, tc.cli_addr, true, conn, ctx); - - let srv_tx_pipe_addr = dtp::pipe::new_transfered(service_idx, - object::borrow_id(&tc), - 2, tc.srv_addr, false, conn, ctx ); + 2, + tc.cli_addr, tc.srv_addr, + conn, ctx); + + let cli_tx_pipe_addr = object::id_to_address( object::borrow_id(&cli_tx_pipe) ); + let srv_tx_pipe_addr = object::id_to_address( object::borrow_id(&srv_tx_pipe) ); // Update the ConnObjects (observed by the end-points when a connection is initiated). conn_objects::set_cli_tx_pipe(conn, cli_tx_pipe_addr); @@ -139,6 +142,9 @@ module dtp::transport_control { tc.cli_tx_pipe = weak_ref::new_from_address(cli_tx_pipe_addr); tc.srv_tx_pipe = weak_ref::new_from_address(srv_tx_pipe_addr); + pipe::transfer( cli_tx_pipe, tc.cli_addr); + pipe::transfer(srv_tx_pipe, tc.srv_addr); + tc } @@ -151,7 +157,6 @@ module dtp::transport_control { cli_tx_pipe: _, srv_tx_pipe: _, } = self; - object::delete(id); }