diff --git a/src/transport/smtp/client/async_connection.rs b/src/transport/smtp/client/async_connection.rs index 22ad206..3f525e7 100644 --- a/src/transport/smtp/client/async_connection.rs +++ b/src/transport/smtp/client/async_connection.rs @@ -2,6 +2,8 @@ use std::{fmt::Display, net::IpAddr, time::Duration}; use futures_util::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +#[cfg(feature = "tokio1")] +use super::async_net::AsyncTokioStream; #[cfg(feature = "tracing")] use super::escape_crlf; use super::{AsyncNetworkStream, ClientCodec, TlsParameters}; @@ -46,6 +48,18 @@ impl AsyncSmtpConnection { &self.server_info } + /// Connects with existing async stream + /// + /// Sends EHLO and parses server information + #[cfg(feature = "tokio1")] + pub async fn connect_with_transport( + stream: Box, + hello_name: &ClientId, + ) -> Result { + let stream = AsyncNetworkStream::use_existing_tokio1(stream); + Self::connect_impl(stream, hello_name).await + } + /// Connects to the configured server /// /// Sends EHLO and parses server information diff --git a/src/transport/smtp/client/async_net.rs b/src/transport/smtp/client/async_net.rs index 72c46f5..45d31eb 100644 --- a/src/transport/smtp/client/async_net.rs +++ b/src/transport/smtp/client/async_net.rs @@ -1,5 +1,5 @@ use std::{ - io, mem, + fmt, io, mem, net::{IpAddr, SocketAddr}, pin::Pin, task::{Context, Poll}, @@ -19,7 +19,7 @@ use futures_rustls::client::TlsStream as AsyncStd1RustlsTlsStream; #[cfg(feature = "tokio1-boring-tls")] use tokio1_boring::SslStream as Tokio1SslStream; #[cfg(feature = "tokio1")] -use tokio1_crate::io::{AsyncRead as _, AsyncWrite as _, ReadBuf as Tokio1ReadBuf}; +use tokio1_crate::io::{AsyncRead, AsyncWrite, ReadBuf as Tokio1ReadBuf}; #[cfg(feature = "tokio1")] use tokio1_crate::net::{ TcpSocket as Tokio1TcpSocket, TcpStream as Tokio1TcpStream, @@ -44,28 +44,42 @@ use crate::transport::smtp::client::net::resolved_address_filter; use crate::transport::smtp::{error, Error}; /// A network stream +#[derive(Debug)] pub struct AsyncNetworkStream { inner: InnerAsyncNetworkStream, } +#[cfg(feature = "tokio1")] +pub trait AsyncTokioStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + fmt::Debug { + fn peer_addr(&self) -> io::Result; +} + +#[cfg(feature = "tokio1")] +impl AsyncTokioStream for Tokio1TcpStream { + fn peer_addr(&self) -> io::Result { + self.peer_addr() + } +} + /// Represents the different types of underlying network streams // usually only one TLS backend at a time is going to be enabled, // so clippy::large_enum_variant doesn't make sense here #[allow(clippy::large_enum_variant)] #[allow(dead_code)] +#[derive(Debug)] enum InnerAsyncNetworkStream { /// Plain Tokio 1.x TCP stream #[cfg(feature = "tokio1")] - Tokio1Tcp(Tokio1TcpStream), + Tokio1Tcp(Box), /// Encrypted Tokio 1.x TCP stream #[cfg(feature = "tokio1-native-tls")] - Tokio1NativeTls(Tokio1TlsStream), + Tokio1NativeTls(Tokio1TlsStream>), /// Encrypted Tokio 1.x TCP stream #[cfg(feature = "tokio1-rustls-tls")] - Tokio1RustlsTls(Tokio1RustlsTlsStream), + Tokio1RustlsTls(Tokio1RustlsTlsStream>), /// Encrypted Tokio 1.x TCP stream #[cfg(feature = "tokio1-boring-tls")] - Tokio1BoringTls(Tokio1SslStream), + Tokio1BoringTls(Tokio1SslStream>), /// Plain Tokio 1.x TCP stream #[cfg(feature = "async-std1")] AsyncStd1Tcp(AsyncStd1TcpStream), @@ -117,6 +131,11 @@ impl AsyncNetworkStream { } } + #[cfg(feature = "tokio1")] + pub fn use_existing_tokio1(stream: Box) -> AsyncNetworkStream { + AsyncNetworkStream::new(InnerAsyncNetworkStream::Tokio1Tcp(stream)) + } + #[cfg(feature = "tokio1")] pub async fn connect_tokio1( server: T, @@ -175,7 +194,8 @@ impl AsyncNetworkStream { } let tcp_stream = try_connect(server, timeout, local_addr).await?; - let mut stream = AsyncNetworkStream::new(InnerAsyncNetworkStream::Tokio1Tcp(tcp_stream)); + let mut stream = + AsyncNetworkStream::new(InnerAsyncNetworkStream::Tokio1Tcp(Box::new(tcp_stream))); if let Some(tls_parameters) = tls_parameters { stream.upgrade_tls(tls_parameters).await?; } @@ -300,7 +320,7 @@ impl AsyncNetworkStream { feature = "tokio1-boring-tls" ))] async fn upgrade_tokio1_tls( - tcp_stream: Tokio1TcpStream, + tcp_stream: Box, tls_parameters: TlsParameters, ) -> Result { let domain = tls_parameters.domain().to_string(); diff --git a/src/transport/smtp/client/mod.rs b/src/transport/smtp/client/mod.rs index fcfcbd8..125b5a6 100644 --- a/src/transport/smtp/client/mod.rs +++ b/src/transport/smtp/client/mod.rs @@ -29,6 +29,8 @@ use std::fmt::Debug; pub use self::async_connection::AsyncSmtpConnection; #[cfg(any(feature = "tokio1", feature = "async-std1"))] pub use self::async_net::AsyncNetworkStream; +#[cfg(feature = "tokio1")] +pub use self::async_net::AsyncTokioStream; use self::net::NetworkStream; #[cfg(any(feature = "native-tls", feature = "rustls-tls", feature = "boring-tls"))] pub(super) use self::tls::InnerTlsParameters;