Sync pool impl (#644)

Co-authored-by: Alexis Mousset <contact@amousset.me>
This commit is contained in:
Paolo Barbolini
2021-10-16 09:39:06 +02:00
committed by GitHub
parent c9b3fa0baa
commit dc9c5df210
6 changed files with 269 additions and 74 deletions

View File

@@ -37,7 +37,6 @@ serde_json = { version = "1", optional = true }
# smtp
nom = { version = "7", optional = true }
r2d2 = { version = "0.8", optional = true } # feature
hostname = { version = "0.3", optional = true } # feature
## tls
@@ -76,7 +75,7 @@ harness = false
name = "transport_smtp"
[features]
default = ["smtp-transport", "pool", "native-tls", "hostname", "r2d2", "builder"]
default = ["smtp-transport", "pool", "native-tls", "hostname", "builder"]
builder = ["httpdate", "mime", "base64", "fastrand", "quoted_printable"]
mime03 = ["mime"]

View File

@@ -27,8 +27,7 @@
//! _Send emails using [`SMTP`]_
//!
//! * **smtp-transport** 📫: Enable the SMTP transport
//! * **r2d2** 📫: Connection pool for SMTP transport
//! * **pool** 📫: Async connection pool for SMTP transport
//! * **pool** 📫: Connection pool for SMTP transport
//! * **hostname** 📫: Try to use the actual system hostname for the SMTP `CLIENTID`
//!
//! #### SMTP over TLS via the native-tls crate

View File

@@ -118,10 +118,8 @@
#[cfg(any(feature = "tokio1", feature = "async-std1"))]
pub use self::async_transport::{AsyncSmtpTransport, AsyncSmtpTransportBuilder};
#[cfg(any(feature = "r2d2", feature = "pool"))]
#[cfg(feature = "pool")]
pub use self::pool::PoolConfig;
#[cfg(feature = "r2d2")]
pub(crate) use self::transport::SmtpClient;
pub use self::{
error::Error,
transport::{SmtpTransport, SmtpTransportBuilder},
@@ -144,7 +142,7 @@ pub mod client;
pub mod commands;
mod error;
pub mod extension;
#[cfg(any(feature = "r2d2", feature = "pool"))]
#[cfg(feature = "pool")]
mod pool;
pub mod response;
mod transport;

View File

@@ -1,18 +1,16 @@
use std::time::Duration;
#[cfg(all(feature = "pool", any(feature = "tokio1", feature = "async-std1")))]
#[cfg(any(feature = "tokio1", feature = "async-std1"))]
pub mod async_impl;
#[cfg(feature = "r2d2")]
pub mod sync_impl;
/// Configuration for a connection pool
#[derive(Debug, Clone)]
#[allow(missing_copy_implementations)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "r2d2", feature = "pool"))))]
#[cfg_attr(docsrs, doc(cfg(feature = "pool")))]
pub struct PoolConfig {
min_idle: u32,
max_size: u32,
connection_timeout: Duration,
idle_timeout: Duration,
}
@@ -43,8 +41,8 @@ impl PoolConfig {
/// Defaults to `30 seconds`
#[doc(hidden)]
#[deprecated(note = "The Connection timeout is already configured on the SMTP transport")]
pub fn connection_timeout(mut self, connection_timeout: Duration) -> Self {
self.connection_timeout = connection_timeout;
pub fn connection_timeout(self, connection_timeout: Duration) -> Self {
let _ = connection_timeout;
self
}
@@ -62,7 +60,6 @@ impl Default for PoolConfig {
Self {
min_idle: 0,
max_size: 10,
connection_timeout: Duration::from_secs(30),
idle_timeout: Duration::from_secs(60),
}
}

View File

@@ -1,51 +1,256 @@
use r2d2::{CustomizeConnection, ManageConnection, Pool};
use std::fmt::{self, Debug};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, TryLockError};
use std::time::{Duration, Instant};
use std::{mem, thread};
use crate::transport::smtp::transport::SmtpClient;
use super::super::client::SmtpConnection;
use super::super::Error;
use super::PoolConfig;
use crate::transport::smtp::{client::SmtpConnection, error, error::Error, SmtpClient};
impl PoolConfig {
pub(crate) fn build<C: ManageConnection<Connection = SmtpConnection, Error = Error>>(
&self,
client: C,
) -> Pool<C> {
Pool::builder()
.min_idle(Some(self.min_idle))
.max_size(self.max_size)
.connection_timeout(self.connection_timeout)
.idle_timeout(Some(self.idle_timeout))
.connection_customizer(Box::new(SmtpConnectionQuitter))
.build_unchecked(client)
pub struct Pool {
config: PoolConfig,
connections: Mutex<Vec<ParkedConnection>>,
client: SmtpClient,
}
struct ParkedConnection {
conn: SmtpConnection,
since: Instant,
}
pub struct PooledConnection {
conn: Option<SmtpConnection>,
pool: Arc<Pool>,
}
impl Pool {
pub fn new(config: PoolConfig, client: SmtpClient) -> Arc<Self> {
let pool = Arc::new(Self {
config,
connections: Mutex::new(Vec::new()),
client,
});
{
let pool_ = Arc::clone(&pool);
let min_idle = pool_.config.min_idle;
let idle_timeout = pool_.config.idle_timeout;
let pool = Arc::downgrade(&pool_);
thread::Builder::new()
.name("lettre-connection-pool".into())
.spawn(move || {
while let Some(pool) = pool.upgrade() {
#[cfg(feature = "tracing")]
tracing::trace!("running cleanup tasks");
#[allow(clippy::needless_collect)]
let (count, dropped) = {
let mut connections = pool.connections.lock().unwrap();
let to_drop = connections
.iter()
.enumerate()
.rev()
.filter(|(_, conn)| conn.idle_duration() > idle_timeout)
.map(|(i, _)| i)
.collect::<Vec<_>>();
let dropped = to_drop
.into_iter()
.map(|i| connections.remove(i))
.collect::<Vec<_>>();
(connections.len(), dropped)
};
#[cfg(feature = "tracing")]
let mut created = 0;
for _ in count..=(min_idle as usize) {
let conn = match pool.client.connection() {
Ok(conn) => conn,
Err(err) => {
#[cfg(feature = "tracing")]
tracing::warn!("couldn't create idle connection {}", err);
#[cfg(not(feature = "tracing"))]
let _ = err;
break;
}
};
let mut connections = pool.connections.lock().unwrap();
connections.push(ParkedConnection::park(conn));
#[cfg(feature = "tracing")]
{
created += 1;
}
}
#[cfg(feature = "tracing")]
if created > 0 {
tracing::debug!("created {} idle connections", created);
}
if !dropped.is_empty() {
#[cfg(feature = "tracing")]
tracing::debug!("dropped {} idle connections", dropped.len());
for conn in dropped {
let mut conn = conn.unpark();
conn.abort();
}
}
thread::sleep(idle_timeout);
}
})
.expect("couldn't spawn the Pool thread");
}
pool
}
pub fn connection(self: &Arc<Self>) -> Result<PooledConnection, Error> {
loop {
let conn = {
let mut connections = self.connections.lock().unwrap();
connections.pop()
};
match conn {
Some(conn) => {
let mut conn = conn.unpark();
// TODO: handle the client try another connection if this one isn't good
if !conn.test_connected() {
#[cfg(feature = "tracing")]
tracing::debug!("dropping a broken connection");
conn.abort();
continue;
}
#[cfg(feature = "tracing")]
tracing::debug!("reusing a pooled connection");
return Ok(PooledConnection::wrap(conn, self.clone()));
}
None => {
#[cfg(feature = "tracing")]
tracing::debug!("creating a new connection");
let conn = self.client.connection()?;
return Ok(PooledConnection::wrap(conn, self.clone()));
}
}
}
}
fn recycle(&self, mut conn: SmtpConnection) {
if conn.has_broken() {
#[cfg(feature = "tracing")]
tracing::debug!("dropping a broken connection instead of recycling it");
conn.abort();
drop(conn);
} else {
#[cfg(feature = "tracing")]
tracing::debug!("recycling connection");
let mut connections = self.connections.lock().unwrap();
if connections.len() >= self.config.max_size as usize {
drop(connections);
conn.abort();
} else {
let conn = ParkedConnection::park(conn);
connections.push(conn);
}
}
}
}
impl ManageConnection for SmtpClient {
type Connection = SmtpConnection;
type Error = Error;
impl Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool")
.field("config", &self.config)
.field(
"connections",
&match self.connections.try_lock() {
Ok(connections) => format!("{} connections", connections.len()),
fn connect(&self) -> Result<Self::Connection, Error> {
self.connection()
}
fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Error> {
if conn.test_connected() {
return Ok(());
}
Err(error::network("is not connected anymore"))
}
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.has_broken()
Err(TryLockError::WouldBlock) => "LOCKED".to_string(),
Err(TryLockError::Poisoned(_)) => "POISONED".to_string(),
},
)
.field("client", &self.client)
.finish()
}
}
#[derive(Copy, Clone, Debug)]
struct SmtpConnectionQuitter;
impl Drop for Pool {
fn drop(&mut self) {
#[cfg(feature = "tracing")]
tracing::debug!("dropping Pool");
impl CustomizeConnection<SmtpConnection, Error> for SmtpConnectionQuitter {
fn on_release(&self, conn: SmtpConnection) {
let mut conn = conn;
if !conn.has_broken() {
let _quit = conn.quit();
let connections = mem::take(&mut *self.connections.get_mut().unwrap());
for conn in connections {
let mut conn = conn.unpark();
conn.abort();
}
}
}
impl ParkedConnection {
fn park(conn: SmtpConnection) -> Self {
Self {
conn,
since: Instant::now(),
}
}
fn idle_duration(&self) -> Duration {
self.since.elapsed()
}
fn unpark(self) -> SmtpConnection {
self.conn
}
}
impl PooledConnection {
fn wrap(conn: SmtpConnection, pool: Arc<Pool>) -> Self {
Self {
conn: Some(conn),
pool,
}
}
}
impl Deref for PooledConnection {
type Target = SmtpConnection;
fn deref(&self) -> &Self::Target {
self.conn.as_ref().expect("conn hasn't been dropped yet")
}
}
impl DerefMut for PooledConnection {
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect("conn hasn't been dropped yet")
}
}
impl Drop for PooledConnection {
fn drop(&mut self) {
let conn = self
.conn
.take()
.expect("SmtpConnection hasn't been taken yet");
self.pool.recycle(conn);
}
}

View File

@@ -1,22 +1,23 @@
#[cfg(feature = "pool")]
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "r2d2")]
use r2d2::Pool;
#[cfg(feature = "r2d2")]
#[cfg(feature = "pool")]
use super::pool::sync_impl::Pool;
#[cfg(feature = "pool")]
use super::PoolConfig;
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
use super::{error, Tls, TlsParameters, SUBMISSIONS_PORT, SUBMISSION_PORT};
use super::{ClientId, Credentials, Error, Mechanism, Response, SmtpConnection, SmtpInfo};
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
use super::{Tls, TlsParameters, SUBMISSIONS_PORT, SUBMISSION_PORT};
use crate::{address::Envelope, Transport};
/// Sends emails using the SMTP protocol
#[cfg_attr(docsrs, doc(cfg(feature = "smtp-transport")))]
#[derive(Clone)]
pub struct SmtpTransport {
#[cfg(feature = "r2d2")]
inner: Pool<SmtpClient>,
#[cfg(not(feature = "r2d2"))]
#[cfg(feature = "pool")]
inner: Arc<Pool>,
#[cfg(not(feature = "pool"))]
inner: SmtpClient,
}
@@ -26,14 +27,11 @@ impl Transport for SmtpTransport {
/// Sends an email
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error> {
#[cfg(feature = "r2d2")]
let mut conn = self.inner.get().map_err(error::client)?;
#[cfg(not(feature = "r2d2"))]
let mut conn = self.inner.connection()?;
let result = conn.send(envelope, email)?;
#[cfg(not(feature = "r2d2"))]
#[cfg(not(feature = "pool"))]
conn.quit()?;
Ok(result)
@@ -105,7 +103,7 @@ impl SmtpTransport {
SmtpTransportBuilder {
info: new,
#[cfg(feature = "r2d2")]
#[cfg(feature = "pool")]
pool_config: PoolConfig::default(),
}
}
@@ -116,7 +114,7 @@ impl SmtpTransport {
#[derive(Debug, Clone)]
pub struct SmtpTransportBuilder {
info: SmtpInfo,
#[cfg(feature = "r2d2")]
#[cfg(feature = "pool")]
pool_config: PoolConfig,
}
@@ -163,8 +161,8 @@ impl SmtpTransportBuilder {
/// Use a custom configuration for the connection pool
///
/// Defaults can be found at [`PoolConfig`]
#[cfg(feature = "r2d2")]
#[cfg_attr(docsrs, doc(cfg(feature = "r2d2")))]
#[cfg(feature = "pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "pool")))]
pub fn pool_config(mut self, pool_config: PoolConfig) -> Self {
self.pool_config = pool_config;
self
@@ -172,16 +170,15 @@ impl SmtpTransportBuilder {
/// Build the transport
///
/// If the `r2d2` feature is enabled an `Arc` wrapped pool is be created.
/// If the `pool` feature is enabled an `Arc` wrapped pool is be created.
/// Defaults can be found at [`PoolConfig`]
pub fn build(self) -> SmtpTransport {
let client = SmtpClient { info: self.info };
SmtpTransport {
#[cfg(feature = "r2d2")]
inner: self.pool_config.build(client),
#[cfg(not(feature = "r2d2"))]
inner: client,
}
#[cfg(feature = "pool")]
let client = Pool::new(self.pool_config, client);
SmtpTransport { inner: client }
}
}