Compare commits

...

9 Commits

Author SHA1 Message Date
Paolo Barbolini
6c0be84817 Prepare v0.11.15 (#1070) 2025-03-10 17:26:17 +01:00
Paolo Barbolini
6059cb04d6 build(deps): upgrade email-encoding to v0.4 (#1069) 2025-03-09 07:22:20 +00:00
Paolo Barbolini
fdf0346556 style: fix rustdoc::broken_intra_doc_links (#1068) 2025-03-09 07:55:15 +01:00
Paolo Barbolini
0f9455715c build(deps): upgrade semver compatible locked dependencies (#1067) 2025-03-08 11:52:38 +00:00
Popax21
0b3a1ed278 feat: add controlled shutdown methods (#1045) 2025-03-08 12:43:05 +01:00
Paolo Barbolini
76bf68268f build(deps): bump minimum supported serde to v1.0.110 (#1064) 2025-03-04 13:06:17 +01:00
Paolo Barbolini
99a86c0fac build(deps): bump minimum supported rustls to v0.23.18 (#1063) 2025-03-04 12:55:43 +01:00
Paolo Barbolini
f0de9ef02c style: deny unreachable_pub lint (#1058) 2025-02-23 10:17:17 +01:00
Paolo Barbolini
b4ddcbdcfc build: bump MSRV to 1.74 (#1060) 2025-02-23 10:16:57 +01:00
19 changed files with 518 additions and 299 deletions

View File

@@ -75,8 +75,8 @@ jobs:
rust: stable
- name: beta
rust: beta
- name: '1.71'
rust: '1.71'
- name: '1.74'
rust: '1.74'
steps:
- name: Checkout

View File

@@ -1,3 +1,31 @@
<a name="v0.11.15"></a>
### v0.11.15 (2025-03-10)
#### Upgrade notes
* MSRV is now 1.74 ([#1060])
#### Features
* Add controlled shutdown methods ([#1045], [#1068])
#### Misc
* Deny `unreachable_pub` lint ([#1058])
* Bump minimum supported `rustls` ([#1063])
* Bump minimum supported `serde` ([#1064])
* Upgrade semver compatible dependencies ([#1067])
* Upgrade `email-encoding` to v0.4 ([#1069])
[#1045]: https://github.com/lettre/lettre/pull/1045
[#1058]: https://github.com/lettre/lettre/pull/1058
[#1060]: https://github.com/lettre/lettre/pull/1060
[#1063]: https://github.com/lettre/lettre/pull/1063
[#1064]: https://github.com/lettre/lettre/pull/1064
[#1067]: https://github.com/lettre/lettre/pull/1067
[#1068]: https://github.com/lettre/lettre/pull/1068
[#1069]: https://github.com/lettre/lettre/pull/1069
<a name="v0.11.14"></a>
### v0.11.14 (2025-02-23)

498
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
[package]
name = "lettre"
# remember to update html_root_url and README.md (Cargo.toml example and deps.rs badge)
version = "0.11.14"
version = "0.11.15"
description = "Email client"
readme = "README.md"
homepage = "https://lettre.rs"
@@ -11,7 +11,7 @@ authors = ["Alexis Mousset <contact@amousset.me>", "Paolo Barbolini <paolo@paolo
categories = ["email", "network-programming"]
keywords = ["email", "smtp", "mailer", "message", "sendmail"]
edition = "2021"
rust-version = "1.71"
rust-version = "1.74"
[badges]
is-it-maintained-issue-resolution = { repository = "lettre/lettre" }
@@ -32,11 +32,11 @@ mime = { version = "0.3.4", optional = true }
fastrand = { version = "2.0", optional = true }
quoted_printable = { version = "0.5", optional = true }
base64 = { version = "0.22", optional = true }
email-encoding = { version = "0.3", optional = true }
email-encoding = { version = "0.4", optional = true }
# file transport
uuid = { version = "1", features = ["v4"], optional = true }
serde = { version = "1", features = ["derive"], optional = true }
serde = { version = "1.0.110", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
# smtp-transport
@@ -48,7 +48,7 @@ percent-encoding = { version = "2.3", optional = true }
## tls
native-tls = { version = "0.2.9", optional = true } # feature
rustls = { version = "0.23.5", default-features = false, features = ["logging", "std", "tls12"], optional = true }
rustls = { version = "0.23.18", default-features = false, features = ["logging", "std", "tls12"], optional = true }
rustls-native-certs = { version = "0.8", optional = true }
webpki-roots = { version = "0.26", optional = true }
boring = { version = "4", optional = true }

View File

@@ -28,8 +28,8 @@
</div>
<div align="center">
<a href="https://deps.rs/crate/lettre/0.11.14">
<img src="https://deps.rs/crate/lettre/0.11.14/status.svg"
<a href="https://deps.rs/crate/lettre/0.11.15">
<img src="https://deps.rs/crate/lettre/0.11.15/status.svg"
alt="dependency status" />
</a>
</div>
@@ -53,12 +53,12 @@ Lettre does not provide (for now):
## Supported Rust Versions
Lettre supports all Rust versions released in the last 6 months. At the time of writing
the minimum supported Rust version is 1.71, but this could change at any time either from
the minimum supported Rust version is 1.74, but this could change at any time either from
one of our dependencies bumping their MSRV or by a new patch release of lettre.
## Example
This library requires Rust 1.71 or newer.
This library requires Rust 1.74 or newer.
To use this library, add the following to your `Cargo.toml`:
```toml

View File

@@ -53,7 +53,7 @@ mod serde_forward_path {
}
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Address>, D::Error>
pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Address>, D::Error>
where
D: serde::Deserializer<'de>,
{

View File

@@ -45,6 +45,7 @@ use crate::transport::smtp::Error;
#[async_trait]
pub trait Executor: Debug + Send + Sync + 'static + private::Sealed {
#[cfg(feature = "smtp-transport")]
#[allow(private_bounds)]
type Handle: SpawnHandle;
#[cfg(feature = "smtp-transport")]
type Sleep: Future<Output = ()> + Send + 'static;
@@ -82,8 +83,8 @@ pub trait Executor: Debug + Send + Sync + 'static + private::Sealed {
#[doc(hidden)]
#[cfg(feature = "smtp-transport")]
#[async_trait]
pub trait SpawnHandle: Debug + Send + Sync + 'static + private::Sealed {
async fn shutdown(self);
pub(crate) trait SpawnHandle: Debug + Send + Sync + 'static + private::Sealed {
async fn shutdown(&self);
}
/// Async [`Executor`] using `tokio` `1.x`
@@ -177,7 +178,7 @@ impl Executor for Tokio1Executor {
#[cfg(all(feature = "smtp-transport", feature = "tokio1"))]
#[async_trait]
impl SpawnHandle for tokio1_crate::task::JoinHandle<()> {
async fn shutdown(self) {
async fn shutdown(&self) {
self.abort();
}
}
@@ -201,7 +202,7 @@ pub struct AsyncStd1Executor;
#[cfg(feature = "async-std1")]
impl Executor for AsyncStd1Executor {
#[cfg(feature = "smtp-transport")]
type Handle = async_std::task::JoinHandle<()>;
type Handle = futures_util::future::AbortHandle;
#[cfg(feature = "smtp-transport")]
type Sleep = BoxFuture<'static, ()>;
@@ -211,7 +212,9 @@ impl Executor for AsyncStd1Executor {
F: Future<Output = ()> + Send + 'static,
F::Output: Send + 'static,
{
async_std::task::spawn(fut)
let (handle, registration) = futures_util::future::AbortHandle::new_pair();
async_std::task::spawn(futures_util::future::Abortable::new(fut, registration));
handle
}
#[cfg(feature = "smtp-transport")]
@@ -272,9 +275,9 @@ impl Executor for AsyncStd1Executor {
#[cfg(all(feature = "smtp-transport", feature = "async-std1"))]
#[async_trait]
impl SpawnHandle for async_std::task::JoinHandle<()> {
async fn shutdown(self) {
self.cancel().await;
impl SpawnHandle for futures_util::future::AbortHandle {
async fn shutdown(&self) {
self.abort();
}
}
@@ -291,5 +294,5 @@ mod private {
impl Sealed for tokio1_crate::task::JoinHandle<()> {}
#[cfg(all(feature = "smtp-transport", feature = "async-std1"))]
impl Sealed for async_std::task::JoinHandle<()> {}
impl Sealed for futures_util::future::AbortHandle {}
}

View File

@@ -6,7 +6,7 @@
//! * Secure defaults
//! * Async support
//!
//! Lettre requires Rust 1.71 or newer.
//! Lettre requires Rust 1.74 or newer.
//!
//! ## Features
//!
@@ -158,11 +158,12 @@
//! [mime 0.3]: https://docs.rs/mime/0.3
//! [DKIM]: https://datatracker.ietf.org/doc/html/rfc6376
#![doc(html_root_url = "https://docs.rs/crate/lettre/0.11.14")]
#![doc(html_root_url = "https://docs.rs/crate/lettre/0.11.15")]
#![doc(html_favicon_url = "https://lettre.rs/favicon.ico")]
#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/15113230?v=4")]
#![forbid(unsafe_code)]
#![deny(
unreachable_pub,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,

View File

@@ -41,14 +41,14 @@ fn quoted_pair() -> impl Parser<char, char, Error = Cheap<char>> {
// FWS = ([*WSP CRLF] 1*WSP) / ; Folding white space
// obs-FWS
pub fn fws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
pub(super) fn fws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
rfc2234::wsp()
.or_not()
.then_ignore(rfc2234::wsp().ignored().repeated())
}
// CFWS = *([FWS] comment) (([FWS] comment) / FWS)
pub fn cfws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
pub(super) fn cfws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
// TODO: comment are not currently supported, so for now a cfws is
// the same as a fws.
fws()
@@ -106,12 +106,12 @@ pub(super) fn atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
}
// dot-atom = [CFWS] dot-atom-text [CFWS]
pub fn dot_atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn dot_atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
cfws().chain(dot_atom_text())
}
// dot-atom-text = 1*atext *("." 1*atext)
pub fn dot_atom_text() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn dot_atom_text() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
atext().repeated().at_least(1).chain(
just('.')
.chain(atext().repeated().at_least(1))
@@ -204,7 +204,7 @@ pub(crate) fn mailbox_list(
// https://datatracker.ietf.org/doc/html/rfc2822#section-3.4.1
// addr-spec = local-part "@" domain
pub fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
pub(super) fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
local_part()
.collect()
.then_ignore(just('@'))
@@ -212,12 +212,12 @@ pub fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
}
// local-part = dot-atom / quoted-string / obs-local-part
pub fn local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
choice((dot_atom(), quoted_string(), obs_local_part()))
}
// domain = dot-atom / domain-literal / obs-domain
pub fn domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
// NOTE: omitting domain-literal since it may never be used
choice((dot_atom(), obs_domain()))
}
@@ -240,11 +240,11 @@ fn obs_phrase() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
// https://datatracker.ietf.org/doc/html/rfc2822#section-4.4
// obs-local-part = word *("." word)
pub fn obs_local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn obs_local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
word().chain(just('.').chain(word()).repeated().flatten())
}
// obs-domain = atom *("." atom)
pub fn obs_domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
pub(super) fn obs_domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
atom().chain(just('.').chain(atom()).repeated().flatten())
}

View File

@@ -140,6 +140,10 @@ pub trait Transport {
}
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error>;
/// Shuts down the transport. Future calls to [`Self::send`] and
/// [`Self::send_raw`] might fail.
fn shutdown(&self) {}
}
/// Async Transport method for emails
@@ -166,4 +170,8 @@ pub trait AsyncTransport {
}
async fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error>;
/// Shuts down the transport. Future calls to [`Self::send`] and
/// [`Self::send_raw`] might fail.
async fn shutdown(&self) {}
}

View File

@@ -79,6 +79,11 @@ impl AsyncTransport for AsyncSmtpTransport<Tokio1Executor> {
Ok(result)
}
async fn shutdown(&self) {
#[cfg(feature = "pool")]
self.inner.shutdown().await;
}
}
#[cfg(feature = "async-std1")]
@@ -97,6 +102,11 @@ impl AsyncTransport for AsyncSmtpTransport<AsyncStd1Executor> {
Ok(result)
}
async fn shutdown(&self) {
#[cfg(feature = "pool")]
self.inner.shutdown().await;
}
}
impl<E> AsyncSmtpTransport<E>
@@ -460,7 +470,7 @@ impl AsyncSmtpTransportBuilder {
}
/// Build client
pub struct AsyncSmtpClient<E> {
pub(super) struct AsyncSmtpClient<E> {
info: SmtpInfo,
marker_: PhantomData<E>,
}
@@ -472,7 +482,7 @@ where
/// Creates a new connection directly usable to send emails
///
/// Handles encryption and authentication
pub async fn connection(&self) -> Result<AsyncSmtpConnection, Error> {
pub(super) async fn connection(&self) -> Result<AsyncSmtpConnection, Error> {
let mut conn = E::connect(
&self.info.server,
self.info.port,

View File

@@ -58,7 +58,7 @@ struct ClientCodec {
impl ClientCodec {
/// Creates a new client codec
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
status: CodecStatus::StartOfNewLine,
}

View File

@@ -490,7 +490,7 @@ impl TlsParametersBuilder {
#[derive(Clone)]
#[allow(clippy::enum_variant_names)]
pub enum InnerTlsParameters {
pub(crate) enum InnerTlsParameters {
#[cfg(feature = "native-tls")]
NativeTls(TlsConnector),
#[cfg(feature = "rustls")]

View File

@@ -77,6 +77,11 @@ impl Error {
matches!(self.inner.kind, Kind::Tls)
}
/// Returns true if the error is because the transport was shut down
pub fn is_transport_shutdown(&self) -> bool {
matches!(self.inner.kind, Kind::TransportShutdown)
}
/// Returns the status code, if the error was generated from a response.
pub fn status(&self) -> Option<Code> {
match self.inner.kind {
@@ -111,6 +116,8 @@ pub(crate) enum Kind {
)]
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]
Tls,
/// Transport shutdown error
TransportShutdown,
}
impl fmt::Debug for Error {
@@ -136,6 +143,7 @@ impl fmt::Display for Error {
Kind::Connection => f.write_str("Connection error")?,
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]
Kind::Tls => f.write_str("tls error")?,
Kind::TransportShutdown => f.write_str("transport has been shut down")?,
Kind::Transient(code) => {
write!(f, "transient error ({code})")?;
}
@@ -189,3 +197,7 @@ pub(crate) fn connection<E: Into<BoxError>>(e: E) -> Error {
pub(crate) fn tls<E: Into<BoxError>>(e: E) -> Error {
Error::new(Kind::Tls, Some(e))
}
pub(crate) fn transport_shutdown() -> Error {
Error::new::<BoxError>(Kind::TransportShutdown, None)
}

View File

@@ -1,6 +1,5 @@
use std::{
fmt::{self, Debug},
mem,
ops::{Deref, DerefMut},
sync::{Arc, OnceLock},
time::{Duration, Instant},
@@ -15,11 +14,15 @@ use super::{
super::{client::AsyncSmtpConnection, Error},
PoolConfig,
};
use crate::{executor::SpawnHandle, transport::smtp::async_transport::AsyncSmtpClient, Executor};
use crate::{
executor::SpawnHandle,
transport::smtp::{async_transport::AsyncSmtpClient, error},
Executor,
};
pub struct Pool<E: Executor> {
pub(crate) struct Pool<E: Executor> {
config: PoolConfig,
connections: Mutex<Vec<ParkedConnection>>,
connections: Mutex<Option<Vec<ParkedConnection>>>,
client: AsyncSmtpClient<E>,
handle: OnceLock<E::Handle>,
}
@@ -29,16 +32,16 @@ struct ParkedConnection {
since: Instant,
}
pub struct PooledConnection<E: Executor> {
pub(crate) struct PooledConnection<E: Executor> {
conn: Option<AsyncSmtpConnection>,
pool: Arc<Pool<E>>,
}
impl<E: Executor> Pool<E> {
pub fn new(config: PoolConfig, client: AsyncSmtpClient<E>) -> Arc<Self> {
pub(crate) fn new(config: PoolConfig, client: AsyncSmtpClient<E>) -> Arc<Self> {
let pool = Arc::new(Self {
config,
connections: Mutex::new(Vec::new()),
connections: Mutex::new(Some(Vec::new())),
client,
handle: OnceLock::new(),
});
@@ -60,6 +63,10 @@ impl<E: Executor> Pool<E> {
#[allow(clippy::needless_collect)]
let (count, dropped) = {
let mut connections = pool.connections.lock().await;
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return;
};
let to_drop = connections
.iter()
@@ -92,6 +99,11 @@ impl<E: Executor> Pool<E> {
};
let mut connections = pool.connections.lock().await;
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return;
};
connections.push(ParkedConnection::park(conn));
#[cfg(feature = "tracing")]
@@ -134,10 +146,29 @@ impl<E: Executor> Pool<E> {
pool
}
pub async fn connection(self: &Arc<Self>) -> Result<PooledConnection<E>, Error> {
pub(crate) async fn shutdown(&self) {
let connections = { self.connections.lock().await.take() };
if let Some(connections) = connections {
stream::iter(connections)
.for_each_concurrent(8, |conn| async move {
conn.unpark().abort().await;
})
.await;
}
if let Some(handle) = self.handle.get() {
handle.shutdown().await;
}
}
pub(crate) async fn connection(self: &Arc<Self>) -> Result<PooledConnection<E>, Error> {
loop {
let conn = {
let mut connections = self.connections.lock().await;
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return Err(error::transport_shutdown());
};
connections.pop()
};
@@ -181,13 +212,20 @@ impl<E: Executor> Pool<E> {
#[cfg(feature = "tracing")]
tracing::debug!("recycling connection");
let mut connections = self.connections.lock().await;
if connections.len() >= self.config.max_size as usize {
drop(connections);
conn.abort().await;
let mut connections_guard = self.connections.lock().await;
if let Some(connections) = connections_guard.as_mut() {
if connections.len() >= self.config.max_size as usize {
drop(connections_guard);
conn.abort().await;
} else {
let conn = ParkedConnection::park(conn);
connections.push(conn);
}
} else {
let conn = ParkedConnection::park(conn);
connections.push(conn);
// The pool has already been shut down
drop(connections_guard);
conn.abort().await;
}
}
}
@@ -200,7 +238,13 @@ impl<E: Executor> Debug for Pool<E> {
.field(
"connections",
&match self.connections.try_lock() {
Some(connections) => format!("{} connections", connections.len()),
Some(connections) => {
if let Some(connections) = connections.as_ref() {
format!("{} connections", connections.len())
} else {
"SHUT DOWN".to_owned()
}
}
None => "LOCKED".to_owned(),
},
@@ -222,14 +266,16 @@ impl<E: Executor> Drop for Pool<E> {
#[cfg(feature = "tracing")]
tracing::debug!("dropping Pool");
let connections = mem::take(self.connections.get_mut());
let connections = self.connections.get_mut().take();
let handle = self.handle.take();
E::spawn(async move {
if let Some(handle) = handle {
handle.shutdown().await;
}
abort_concurrent(connections.into_iter().map(ParkedConnection::unpark)).await;
if let Some(connections) = connections {
abort_concurrent(connections.into_iter().map(ParkedConnection::unpark)).await;
}
});
}
}

View File

@@ -1,8 +1,8 @@
use std::time::Duration;
#[cfg(any(feature = "tokio1", feature = "async-std1"))]
pub mod async_impl;
pub mod sync_impl;
pub(super) mod async_impl;
pub(super) mod sync_impl;
/// Configuration for a connection pool
#[derive(Debug, Clone)]

View File

@@ -1,8 +1,7 @@
use std::{
fmt::{self, Debug},
mem,
ops::{Deref, DerefMut},
sync::{Arc, Mutex, TryLockError},
sync::{mpsc, Arc, Mutex, TryLockError},
thread,
time::{Duration, Instant},
};
@@ -11,11 +10,12 @@ use super::{
super::{client::SmtpConnection, Error},
PoolConfig,
};
use crate::transport::smtp::transport::SmtpClient;
use crate::transport::smtp::{error, transport::SmtpClient};
pub struct Pool {
pub(crate) struct Pool {
config: PoolConfig,
connections: Mutex<Vec<ParkedConnection>>,
connections: Mutex<Option<Vec<ParkedConnection>>>,
thread_terminator: mpsc::SyncSender<()>,
client: SmtpClient,
}
@@ -24,16 +24,19 @@ struct ParkedConnection {
since: Instant,
}
pub struct PooledConnection {
pub(crate) struct PooledConnection {
conn: Option<SmtpConnection>,
pool: Arc<Pool>,
}
impl Pool {
pub fn new(config: PoolConfig, client: SmtpClient) -> Arc<Self> {
pub(crate) fn new(config: PoolConfig, client: SmtpClient) -> Arc<Self> {
let (thread_tx, thread_rx) = mpsc::sync_channel(1);
let pool = Arc::new(Self {
config,
connections: Mutex::new(Vec::new()),
connections: Mutex::new(Some(Vec::new())),
thread_terminator: thread_tx,
client,
});
@@ -54,6 +57,10 @@ impl Pool {
#[allow(clippy::needless_collect)]
let (count, dropped) = {
let mut connections = pool.connections.lock().unwrap();
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return;
};
let to_drop = connections
.iter()
@@ -86,6 +93,11 @@ impl Pool {
};
let mut connections = pool.connections.lock().unwrap();
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return;
};
connections.push(ParkedConnection::park(conn));
#[cfg(feature = "tracing")]
@@ -110,7 +122,14 @@ impl Pool {
}
drop(pool);
thread::sleep(idle_timeout);
match thread_rx.recv_timeout(idle_timeout) {
Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => {
// The transport was shut down
return;
}
Err(mpsc::RecvTimeoutError::Timeout) => {}
}
}
})
.expect("couldn't spawn the Pool thread");
@@ -119,10 +138,25 @@ impl Pool {
pool
}
pub fn connection(self: &Arc<Self>) -> Result<PooledConnection, Error> {
pub(crate) fn shutdown(&self) {
let connections = { self.connections.lock().unwrap().take() };
if let Some(connections) = connections {
for conn in connections {
conn.unpark().abort();
}
}
_ = self.thread_terminator.try_send(());
}
pub(crate) fn connection(self: &Arc<Self>) -> Result<PooledConnection, Error> {
loop {
let conn = {
let mut connections = self.connections.lock().unwrap();
let Some(connections) = connections.as_mut() else {
// The transport was shut down
return Err(error::transport_shutdown());
};
connections.pop()
};
@@ -166,13 +200,20 @@ impl Pool {
#[cfg(feature = "tracing")]
tracing::debug!("recycling connection");
let mut connections = self.connections.lock().unwrap();
if connections.len() >= self.config.max_size as usize {
drop(connections);
conn.abort();
let mut connections_guard = self.connections.lock().unwrap();
if let Some(connections) = connections_guard.as_mut() {
if connections.len() >= self.config.max_size as usize {
drop(connections_guard);
conn.abort();
} else {
let conn = ParkedConnection::park(conn);
connections.push(conn);
}
} else {
let conn = ParkedConnection::park(conn);
connections.push(conn);
// The pool has already been shut down
drop(connections_guard);
conn.abort();
}
}
}
@@ -185,7 +226,13 @@ impl Debug for Pool {
.field(
"connections",
&match self.connections.try_lock() {
Ok(connections) => format!("{} connections", connections.len()),
Ok(connections) => {
if let Some(connections) = connections.as_ref() {
format!("{} connections", connections.len())
} else {
"SHUT DOWN".to_owned()
}
}
Err(TryLockError::WouldBlock) => "LOCKED".to_owned(),
Err(TryLockError::Poisoned(_)) => "POISONED".to_owned(),
@@ -201,10 +248,11 @@ impl Drop for Pool {
#[cfg(feature = "tracing")]
tracing::debug!("dropping Pool");
let connections = mem::take(&mut *self.connections.get_mut().unwrap());
for conn in connections {
let mut conn = conn.unpark();
conn.abort();
if let Some(connections) = self.connections.get_mut().unwrap().take() {
for conn in connections {
let mut conn = conn.unpark();
conn.abort();
}
}
}
}

View File

@@ -60,6 +60,11 @@ impl Transport for SmtpTransport {
Ok(result)
}
fn shutdown(&self) {
#[cfg(feature = "pool")]
self.inner.shutdown();
}
}
impl Debug for SmtpTransport {
@@ -369,7 +374,7 @@ impl SmtpTransportBuilder {
/// Build client
#[derive(Debug, Clone)]
pub struct SmtpClient {
pub(super) struct SmtpClient {
info: SmtpInfo,
}
@@ -377,7 +382,7 @@ impl SmtpClient {
/// Creates a new connection directly usable to send emails
///
/// Handles encryption and authentication
pub fn connection(&self) -> Result<SmtpConnection, Error> {
pub(super) fn connection(&self) -> Result<SmtpConnection, Error> {
#[allow(clippy::match_single_binding)]
let tls_parameters = match &self.info.tls {
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]

View File

@@ -4,7 +4,7 @@ use std::fmt::{Display, Formatter, Result as FmtResult};
/// Encode a string as xtext
#[derive(Debug)]
pub struct XText<'a>(pub &'a str);
pub(crate) struct XText<'a>(pub(crate) &'a str);
impl Display for XText<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {