From dc9c5df210f815a249caf54fe2b26648b9dcea34 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Sat, 16 Oct 2021 09:39:06 +0200 Subject: [PATCH] Sync pool impl (#644) Co-authored-by: Alexis Mousset --- Cargo.toml | 3 +- src/lib.rs | 3 +- src/transport/smtp/mod.rs | 6 +- src/transport/smtp/pool/mod.rs | 11 +- src/transport/smtp/pool/sync_impl.rs | 275 +++++++++++++++++++++++---- src/transport/smtp/transport.rs | 45 ++--- 6 files changed, 269 insertions(+), 74 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52aa39b..132b7eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ serde_json = { version = "1", optional = true } # smtp nom = { version = "7", optional = true } -r2d2 = { version = "0.8", optional = true } # feature hostname = { version = "0.3", optional = true } # feature ## tls @@ -76,7 +75,7 @@ harness = false name = "transport_smtp" [features] -default = ["smtp-transport", "pool", "native-tls", "hostname", "r2d2", "builder"] +default = ["smtp-transport", "pool", "native-tls", "hostname", "builder"] builder = ["httpdate", "mime", "base64", "fastrand", "quoted_printable"] mime03 = ["mime"] diff --git a/src/lib.rs b/src/lib.rs index 58bc6bb..a83a042 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,8 +27,7 @@ //! _Send emails using [`SMTP`]_ //! //! * **smtp-transport** 📫: Enable the SMTP transport -//! * **r2d2** 📫: Connection pool for SMTP transport -//! * **pool** 📫: Async connection pool for SMTP transport +//! * **pool** 📫: Connection pool for SMTP transport //! * **hostname** 📫: Try to use the actual system hostname for the SMTP `CLIENTID` //! //! #### SMTP over TLS via the native-tls crate diff --git a/src/transport/smtp/mod.rs b/src/transport/smtp/mod.rs index f9ad2db..c2cac3e 100644 --- a/src/transport/smtp/mod.rs +++ b/src/transport/smtp/mod.rs @@ -118,10 +118,8 @@ #[cfg(any(feature = "tokio1", feature = "async-std1"))] pub use self::async_transport::{AsyncSmtpTransport, AsyncSmtpTransportBuilder}; -#[cfg(any(feature = "r2d2", feature = "pool"))] +#[cfg(feature = "pool")] pub use self::pool::PoolConfig; -#[cfg(feature = "r2d2")] -pub(crate) use self::transport::SmtpClient; pub use self::{ error::Error, transport::{SmtpTransport, SmtpTransportBuilder}, @@ -144,7 +142,7 @@ pub mod client; pub mod commands; mod error; pub mod extension; -#[cfg(any(feature = "r2d2", feature = "pool"))] +#[cfg(feature = "pool")] mod pool; pub mod response; mod transport; diff --git a/src/transport/smtp/pool/mod.rs b/src/transport/smtp/pool/mod.rs index 20181ea..1d5c136 100644 --- a/src/transport/smtp/pool/mod.rs +++ b/src/transport/smtp/pool/mod.rs @@ -1,18 +1,16 @@ use std::time::Duration; -#[cfg(all(feature = "pool", any(feature = "tokio1", feature = "async-std1")))] +#[cfg(any(feature = "tokio1", feature = "async-std1"))] pub mod async_impl; -#[cfg(feature = "r2d2")] pub mod sync_impl; /// Configuration for a connection pool #[derive(Debug, Clone)] #[allow(missing_copy_implementations)] -#[cfg_attr(docsrs, doc(cfg(any(feature = "r2d2", feature = "pool"))))] +#[cfg_attr(docsrs, doc(cfg(feature = "pool")))] pub struct PoolConfig { min_idle: u32, max_size: u32, - connection_timeout: Duration, idle_timeout: Duration, } @@ -43,8 +41,8 @@ impl PoolConfig { /// Defaults to `30 seconds` #[doc(hidden)] #[deprecated(note = "The Connection timeout is already configured on the SMTP transport")] - pub fn connection_timeout(mut self, connection_timeout: Duration) -> Self { - self.connection_timeout = connection_timeout; + pub fn connection_timeout(self, connection_timeout: Duration) -> Self { + let _ = connection_timeout; self } @@ -62,7 +60,6 @@ impl Default for PoolConfig { Self { min_idle: 0, max_size: 10, - connection_timeout: Duration::from_secs(30), idle_timeout: Duration::from_secs(60), } } diff --git a/src/transport/smtp/pool/sync_impl.rs b/src/transport/smtp/pool/sync_impl.rs index 7a3dd44..40e0253 100644 --- a/src/transport/smtp/pool/sync_impl.rs +++ b/src/transport/smtp/pool/sync_impl.rs @@ -1,51 +1,256 @@ -use r2d2::{CustomizeConnection, ManageConnection, Pool}; +use std::fmt::{self, Debug}; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, TryLockError}; +use std::time::{Duration, Instant}; +use std::{mem, thread}; +use crate::transport::smtp::transport::SmtpClient; + +use super::super::client::SmtpConnection; +use super::super::Error; use super::PoolConfig; -use crate::transport::smtp::{client::SmtpConnection, error, error::Error, SmtpClient}; -impl PoolConfig { - pub(crate) fn build>( - &self, - client: C, - ) -> Pool { - Pool::builder() - .min_idle(Some(self.min_idle)) - .max_size(self.max_size) - .connection_timeout(self.connection_timeout) - .idle_timeout(Some(self.idle_timeout)) - .connection_customizer(Box::new(SmtpConnectionQuitter)) - .build_unchecked(client) - } +pub struct Pool { + config: PoolConfig, + connections: Mutex>, + client: SmtpClient, } -impl ManageConnection for SmtpClient { - type Connection = SmtpConnection; - type Error = Error; +struct ParkedConnection { + conn: SmtpConnection, + since: Instant, +} - fn connect(&self) -> Result { - self.connection() - } +pub struct PooledConnection { + conn: Option, + pool: Arc, +} - fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Error> { - if conn.test_connected() { - return Ok(()); +impl Pool { + pub fn new(config: PoolConfig, client: SmtpClient) -> Arc { + let pool = Arc::new(Self { + config, + connections: Mutex::new(Vec::new()), + client, + }); + + { + let pool_ = Arc::clone(&pool); + + let min_idle = pool_.config.min_idle; + let idle_timeout = pool_.config.idle_timeout; + let pool = Arc::downgrade(&pool_); + + thread::Builder::new() + .name("lettre-connection-pool".into()) + .spawn(move || { + while let Some(pool) = pool.upgrade() { + #[cfg(feature = "tracing")] + tracing::trace!("running cleanup tasks"); + + #[allow(clippy::needless_collect)] + let (count, dropped) = { + let mut connections = pool.connections.lock().unwrap(); + + let to_drop = connections + .iter() + .enumerate() + .rev() + .filter(|(_, conn)| conn.idle_duration() > idle_timeout) + .map(|(i, _)| i) + .collect::>(); + let dropped = to_drop + .into_iter() + .map(|i| connections.remove(i)) + .collect::>(); + + (connections.len(), dropped) + }; + + #[cfg(feature = "tracing")] + let mut created = 0; + for _ in count..=(min_idle as usize) { + let conn = match pool.client.connection() { + Ok(conn) => conn, + Err(err) => { + #[cfg(feature = "tracing")] + tracing::warn!("couldn't create idle connection {}", err); + #[cfg(not(feature = "tracing"))] + let _ = err; + + break; + } + }; + + let mut connections = pool.connections.lock().unwrap(); + connections.push(ParkedConnection::park(conn)); + + #[cfg(feature = "tracing")] + { + created += 1; + } + } + + #[cfg(feature = "tracing")] + if created > 0 { + tracing::debug!("created {} idle connections", created); + } + + if !dropped.is_empty() { + #[cfg(feature = "tracing")] + tracing::debug!("dropped {} idle connections", dropped.len()); + + for conn in dropped { + let mut conn = conn.unpark(); + conn.abort(); + } + } + + thread::sleep(idle_timeout); + } + }) + .expect("couldn't spawn the Pool thread"); } - Err(error::network("is not connected anymore")) + + pool } - fn has_broken(&self, conn: &mut Self::Connection) -> bool { - conn.has_broken() + pub fn connection(self: &Arc) -> Result { + loop { + let conn = { + let mut connections = self.connections.lock().unwrap(); + connections.pop() + }; + + match conn { + Some(conn) => { + let mut conn = conn.unpark(); + + // TODO: handle the client try another connection if this one isn't good + if !conn.test_connected() { + #[cfg(feature = "tracing")] + tracing::debug!("dropping a broken connection"); + + conn.abort(); + continue; + } + + #[cfg(feature = "tracing")] + tracing::debug!("reusing a pooled connection"); + + return Ok(PooledConnection::wrap(conn, self.clone())); + } + None => { + #[cfg(feature = "tracing")] + tracing::debug!("creating a new connection"); + + let conn = self.client.connection()?; + return Ok(PooledConnection::wrap(conn, self.clone())); + } + } + } } -} -#[derive(Copy, Clone, Debug)] -struct SmtpConnectionQuitter; + fn recycle(&self, mut conn: SmtpConnection) { + if conn.has_broken() { + #[cfg(feature = "tracing")] + tracing::debug!("dropping a broken connection instead of recycling it"); -impl CustomizeConnection for SmtpConnectionQuitter { - fn on_release(&self, conn: SmtpConnection) { - let mut conn = conn; - if !conn.has_broken() { - let _quit = conn.quit(); + conn.abort(); + drop(conn); + } else { + #[cfg(feature = "tracing")] + tracing::debug!("recycling connection"); + + let mut connections = self.connections.lock().unwrap(); + if connections.len() >= self.config.max_size as usize { + drop(connections); + conn.abort(); + } else { + let conn = ParkedConnection::park(conn); + connections.push(conn); + } } } } + +impl Debug for Pool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Pool") + .field("config", &self.config) + .field( + "connections", + &match self.connections.try_lock() { + Ok(connections) => format!("{} connections", connections.len()), + + Err(TryLockError::WouldBlock) => "LOCKED".to_string(), + Err(TryLockError::Poisoned(_)) => "POISONED".to_string(), + }, + ) + .field("client", &self.client) + .finish() + } +} + +impl Drop for Pool { + fn drop(&mut self) { + #[cfg(feature = "tracing")] + tracing::debug!("dropping Pool"); + + let connections = mem::take(&mut *self.connections.get_mut().unwrap()); + for conn in connections { + let mut conn = conn.unpark(); + conn.abort(); + } + } +} + +impl ParkedConnection { + fn park(conn: SmtpConnection) -> Self { + Self { + conn, + since: Instant::now(), + } + } + + fn idle_duration(&self) -> Duration { + self.since.elapsed() + } + + fn unpark(self) -> SmtpConnection { + self.conn + } +} + +impl PooledConnection { + fn wrap(conn: SmtpConnection, pool: Arc) -> Self { + Self { + conn: Some(conn), + pool, + } + } +} + +impl Deref for PooledConnection { + type Target = SmtpConnection; + + fn deref(&self) -> &Self::Target { + self.conn.as_ref().expect("conn hasn't been dropped yet") + } +} + +impl DerefMut for PooledConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + self.conn.as_mut().expect("conn hasn't been dropped yet") + } +} + +impl Drop for PooledConnection { + fn drop(&mut self) { + let conn = self + .conn + .take() + .expect("SmtpConnection hasn't been taken yet"); + self.pool.recycle(conn); + } +} diff --git a/src/transport/smtp/transport.rs b/src/transport/smtp/transport.rs index 9b9d2ae..f72ec82 100644 --- a/src/transport/smtp/transport.rs +++ b/src/transport/smtp/transport.rs @@ -1,22 +1,23 @@ +#[cfg(feature = "pool")] +use std::sync::Arc; use std::time::Duration; -#[cfg(feature = "r2d2")] -use r2d2::Pool; - -#[cfg(feature = "r2d2")] +#[cfg(feature = "pool")] +use super::pool::sync_impl::Pool; +#[cfg(feature = "pool")] use super::PoolConfig; -#[cfg(any(feature = "native-tls", feature = "rustls-tls"))] -use super::{error, Tls, TlsParameters, SUBMISSIONS_PORT, SUBMISSION_PORT}; use super::{ClientId, Credentials, Error, Mechanism, Response, SmtpConnection, SmtpInfo}; +#[cfg(any(feature = "native-tls", feature = "rustls-tls"))] +use super::{Tls, TlsParameters, SUBMISSIONS_PORT, SUBMISSION_PORT}; use crate::{address::Envelope, Transport}; /// Sends emails using the SMTP protocol #[cfg_attr(docsrs, doc(cfg(feature = "smtp-transport")))] #[derive(Clone)] pub struct SmtpTransport { - #[cfg(feature = "r2d2")] - inner: Pool, - #[cfg(not(feature = "r2d2"))] + #[cfg(feature = "pool")] + inner: Arc, + #[cfg(not(feature = "pool"))] inner: SmtpClient, } @@ -26,14 +27,11 @@ impl Transport for SmtpTransport { /// Sends an email fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result { - #[cfg(feature = "r2d2")] - let mut conn = self.inner.get().map_err(error::client)?; - #[cfg(not(feature = "r2d2"))] let mut conn = self.inner.connection()?; let result = conn.send(envelope, email)?; - #[cfg(not(feature = "r2d2"))] + #[cfg(not(feature = "pool"))] conn.quit()?; Ok(result) @@ -105,7 +103,7 @@ impl SmtpTransport { SmtpTransportBuilder { info: new, - #[cfg(feature = "r2d2")] + #[cfg(feature = "pool")] pool_config: PoolConfig::default(), } } @@ -116,7 +114,7 @@ impl SmtpTransport { #[derive(Debug, Clone)] pub struct SmtpTransportBuilder { info: SmtpInfo, - #[cfg(feature = "r2d2")] + #[cfg(feature = "pool")] pool_config: PoolConfig, } @@ -163,8 +161,8 @@ impl SmtpTransportBuilder { /// Use a custom configuration for the connection pool /// /// Defaults can be found at [`PoolConfig`] - #[cfg(feature = "r2d2")] - #[cfg_attr(docsrs, doc(cfg(feature = "r2d2")))] + #[cfg(feature = "pool")] + #[cfg_attr(docsrs, doc(cfg(feature = "pool")))] pub fn pool_config(mut self, pool_config: PoolConfig) -> Self { self.pool_config = pool_config; self @@ -172,16 +170,15 @@ impl SmtpTransportBuilder { /// Build the transport /// - /// If the `r2d2` feature is enabled an `Arc` wrapped pool is be created. + /// If the `pool` feature is enabled an `Arc` wrapped pool is be created. /// Defaults can be found at [`PoolConfig`] pub fn build(self) -> SmtpTransport { let client = SmtpClient { info: self.info }; - SmtpTransport { - #[cfg(feature = "r2d2")] - inner: self.pool_config.build(client), - #[cfg(not(feature = "r2d2"))] - inner: client, - } + + #[cfg(feature = "pool")] + let client = Pool::new(self.pool_config, client); + + SmtpTransport { inner: client } } }