Skip to content

Commit

Permalink
Proxy protocol support (#3)
Browse files Browse the repository at this point in the history
Add proxy protocol support.

---------

Co-authored-by: Alcibiades <[email protected]>
Co-authored-by: Dave Belvedere <[email protected]>
Co-authored-by: Alcibiades Athens <[email protected]>
  • Loading branch information
4 people authored Oct 14, 2023
1 parent 13ae39f commit 6d93b42
Show file tree
Hide file tree
Showing 8 changed files with 1,047 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ settings.yaml
lcov.info

# Ignore cargo lock for library
Cargo.lock
Cargo.lock
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ version = "0.5.3"
default = []
tls-rustls = ["arc-swap", "pin-project-lite", "rustls", "rustls-pemfile", "tokio/fs", "tokio/time", "tokio-rustls"]
tls-openssl = ["openssl", "tokio-openssl", "pin-project-lite"]
proxy-protocol = ["ppp", "pin-project-lite"]

[dependencies]

Expand All @@ -37,6 +38,9 @@ tokio-openssl = { version = "0.6", optional = true }
tokio-rustls = { version = "0.24", optional = true }
tower-service = "0.3"

## proxy-protocol
ppp = { version = "2.2.0", optional = true }

[dev-dependencies]
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }
Expand Down
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,7 @@ pub mod tls_openssl;
#[doc(inline)]
#[cfg(feature = "tls-openssl")]
pub use self::tls_openssl::bind_openssl;

#[cfg(feature = "proxy-protocol")]
#[cfg_attr(docsrs, doc(cfg(feature = "proxy_protocol")))]
pub mod proxy_protocol;
143 changes: 143 additions & 0 deletions src/proxy_protocol/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! Future types for PROXY protocol support.
use crate::accept::Accept;
use crate::proxy_protocol::ForwardClientIp;
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Timeout;

// A `pin_project` is a procedural macro used for safe field projection in conjunction
// with the Rust Pin API, which guarantees that certain types will not move in memory.
pin_project! {
/// This struct represents the future for the ProxyProtocolAcceptor.
/// The generic types are:
/// F: The future type.
/// A: The type that implements the Accept trait.
/// I: The IO type that supports both AsyncRead and AsyncWrite.
/// S: The service type.
pub struct ProxyProtocolAcceptorFuture<F, A, I, S>
where
A: Accept<I, S>,
{
#[pin]
inner: AcceptFuture<F, A, I, S>,
}
}

impl<F, A, I, S> ProxyProtocolAcceptorFuture<F, A, I, S>
where
A: Accept<I, S>,
I: AsyncRead + AsyncWrite + Unpin,
{
// Constructor for creating a new ProxyProtocolAcceptorFuture.
pub(crate) fn new(future: Timeout<F>, acceptor: A, service: S) -> Self {
let inner = AcceptFuture::ReadHeader {
future,
acceptor,
service: Some(service),
};
Self { inner }
}
}

// Implement Debug trait for ProxyProtocolAcceptorFuture to allow
// debugging and logging.
impl<F, A, I, S> fmt::Debug for ProxyProtocolAcceptorFuture<F, A, I, S>
where
A: Accept<I, S>,
I: AsyncRead + AsyncWrite + Unpin,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProxyProtocolAcceptorFuture").finish()
}
}

pin_project! {
// AcceptFuture represents the internal states of ProxyProtocolAcceptorFuture.
// It can either be waiting to read the header or forward the client IP.
#[project = AcceptFutureProj]
enum AcceptFuture<F, A, I, S>
where
A: Accept<I, S>,
{
ReadHeader {
#[pin]
future: Timeout<F>,
acceptor: A,
service: Option<S>,
},
ForwardIp {
#[pin]
future: A::Future,
client_address: Option<SocketAddr>,
},
}
}

impl<F, A, I, S> Future for ProxyProtocolAcceptorFuture<F, A, I, S>
where
A: Accept<I, S>,
I: AsyncRead + AsyncWrite + Unpin,
// Future whose output is a result with either a tuple of stream and optional address,
// or an io::Error.
F: Future<Output = Result<(I, Option<SocketAddr>), io::Error>>,
{
type Output = io::Result<(A::Stream, ForwardClientIp<A::Service>)>;

// The main poll function that drives the future towards completion.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
// Check the current state of the inner future.
match this.inner.as_mut().project() {
AcceptFutureProj::ReadHeader {
future,
acceptor,
service,
} => match future.poll(cx) {
Poll::Ready(Ok(Ok((stream, client_address)))) => {
let service = service.take().expect("future polled after ready");
let future = acceptor.accept(stream, service);

// Transition to the ForwardIp state after successfully reading the header.
this.inner.set(AcceptFuture::ForwardIp {
future,
client_address,
});
}
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Err(e)),
Poll::Ready(Err(timeout)) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut, timeout)))
}
Poll::Pending => return Poll::Pending,
},
AcceptFutureProj::ForwardIp {
future,
client_address,
} => {
return match future.poll(cx) {
Poll::Ready(Ok((stream, service))) => {
let service = ForwardClientIp {
inner: service,
client_address: *client_address,
};

// Return the successfully processed stream and service.
Poll::Ready(Ok((stream, service)))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
};
}
}
}
}
}
Loading

0 comments on commit 6d93b42

Please sign in to comment.