Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c0be84817 | ||
|
|
6059cb04d6 | ||
|
|
fdf0346556 | ||
|
|
0f9455715c | ||
|
|
0b3a1ed278 | ||
|
|
76bf68268f | ||
|
|
99a86c0fac | ||
|
|
f0de9ef02c | ||
|
|
b4ddcbdcfc |
4
.github/workflows/test.yml
vendored
4
.github/workflows/test.yml
vendored
@@ -75,8 +75,8 @@ jobs:
|
||||
rust: stable
|
||||
- name: beta
|
||||
rust: beta
|
||||
- name: '1.71'
|
||||
rust: '1.71'
|
||||
- name: '1.74'
|
||||
rust: '1.74'
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
28
CHANGELOG.md
28
CHANGELOG.md
@@ -1,3 +1,31 @@
|
||||
<a name="v0.11.15"></a>
|
||||
### v0.11.15 (2025-03-10)
|
||||
|
||||
#### Upgrade notes
|
||||
|
||||
* MSRV is now 1.74 ([#1060])
|
||||
|
||||
#### Features
|
||||
|
||||
* Add controlled shutdown methods ([#1045], [#1068])
|
||||
|
||||
#### Misc
|
||||
|
||||
* Deny `unreachable_pub` lint ([#1058])
|
||||
* Bump minimum supported `rustls` ([#1063])
|
||||
* Bump minimum supported `serde` ([#1064])
|
||||
* Upgrade semver compatible dependencies ([#1067])
|
||||
* Upgrade `email-encoding` to v0.4 ([#1069])
|
||||
|
||||
[#1045]: https://github.com/lettre/lettre/pull/1045
|
||||
[#1058]: https://github.com/lettre/lettre/pull/1058
|
||||
[#1060]: https://github.com/lettre/lettre/pull/1060
|
||||
[#1063]: https://github.com/lettre/lettre/pull/1063
|
||||
[#1064]: https://github.com/lettre/lettre/pull/1064
|
||||
[#1067]: https://github.com/lettre/lettre/pull/1067
|
||||
[#1068]: https://github.com/lettre/lettre/pull/1068
|
||||
[#1069]: https://github.com/lettre/lettre/pull/1069
|
||||
|
||||
<a name="v0.11.14"></a>
|
||||
### v0.11.14 (2025-02-23)
|
||||
|
||||
|
||||
498
Cargo.lock
generated
498
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lettre"
|
||||
# remember to update html_root_url and README.md (Cargo.toml example and deps.rs badge)
|
||||
version = "0.11.14"
|
||||
version = "0.11.15"
|
||||
description = "Email client"
|
||||
readme = "README.md"
|
||||
homepage = "https://lettre.rs"
|
||||
@@ -11,7 +11,7 @@ authors = ["Alexis Mousset <contact@amousset.me>", "Paolo Barbolini <paolo@paolo
|
||||
categories = ["email", "network-programming"]
|
||||
keywords = ["email", "smtp", "mailer", "message", "sendmail"]
|
||||
edition = "2021"
|
||||
rust-version = "1.71"
|
||||
rust-version = "1.74"
|
||||
|
||||
[badges]
|
||||
is-it-maintained-issue-resolution = { repository = "lettre/lettre" }
|
||||
@@ -32,11 +32,11 @@ mime = { version = "0.3.4", optional = true }
|
||||
fastrand = { version = "2.0", optional = true }
|
||||
quoted_printable = { version = "0.5", optional = true }
|
||||
base64 = { version = "0.22", optional = true }
|
||||
email-encoding = { version = "0.3", optional = true }
|
||||
email-encoding = { version = "0.4", optional = true }
|
||||
|
||||
# file transport
|
||||
uuid = { version = "1", features = ["v4"], optional = true }
|
||||
serde = { version = "1", features = ["derive"], optional = true }
|
||||
serde = { version = "1.0.110", features = ["derive"], optional = true }
|
||||
serde_json = { version = "1", optional = true }
|
||||
|
||||
# smtp-transport
|
||||
@@ -48,7 +48,7 @@ percent-encoding = { version = "2.3", optional = true }
|
||||
|
||||
## tls
|
||||
native-tls = { version = "0.2.9", optional = true } # feature
|
||||
rustls = { version = "0.23.5", default-features = false, features = ["logging", "std", "tls12"], optional = true }
|
||||
rustls = { version = "0.23.18", default-features = false, features = ["logging", "std", "tls12"], optional = true }
|
||||
rustls-native-certs = { version = "0.8", optional = true }
|
||||
webpki-roots = { version = "0.26", optional = true }
|
||||
boring = { version = "4", optional = true }
|
||||
|
||||
@@ -28,8 +28,8 @@
|
||||
</div>
|
||||
|
||||
<div align="center">
|
||||
<a href="https://deps.rs/crate/lettre/0.11.14">
|
||||
<img src="https://deps.rs/crate/lettre/0.11.14/status.svg"
|
||||
<a href="https://deps.rs/crate/lettre/0.11.15">
|
||||
<img src="https://deps.rs/crate/lettre/0.11.15/status.svg"
|
||||
alt="dependency status" />
|
||||
</a>
|
||||
</div>
|
||||
@@ -53,12 +53,12 @@ Lettre does not provide (for now):
|
||||
## Supported Rust Versions
|
||||
|
||||
Lettre supports all Rust versions released in the last 6 months. At the time of writing
|
||||
the minimum supported Rust version is 1.71, but this could change at any time either from
|
||||
the minimum supported Rust version is 1.74, but this could change at any time either from
|
||||
one of our dependencies bumping their MSRV or by a new patch release of lettre.
|
||||
|
||||
## Example
|
||||
|
||||
This library requires Rust 1.71 or newer.
|
||||
This library requires Rust 1.74 or newer.
|
||||
To use this library, add the following to your `Cargo.toml`:
|
||||
|
||||
```toml
|
||||
|
||||
@@ -53,7 +53,7 @@ mod serde_forward_path {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Address>, D::Error>
|
||||
pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Address>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
|
||||
@@ -45,6 +45,7 @@ use crate::transport::smtp::Error;
|
||||
#[async_trait]
|
||||
pub trait Executor: Debug + Send + Sync + 'static + private::Sealed {
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
#[allow(private_bounds)]
|
||||
type Handle: SpawnHandle;
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
type Sleep: Future<Output = ()> + Send + 'static;
|
||||
@@ -82,8 +83,8 @@ pub trait Executor: Debug + Send + Sync + 'static + private::Sealed {
|
||||
#[doc(hidden)]
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
#[async_trait]
|
||||
pub trait SpawnHandle: Debug + Send + Sync + 'static + private::Sealed {
|
||||
async fn shutdown(self);
|
||||
pub(crate) trait SpawnHandle: Debug + Send + Sync + 'static + private::Sealed {
|
||||
async fn shutdown(&self);
|
||||
}
|
||||
|
||||
/// Async [`Executor`] using `tokio` `1.x`
|
||||
@@ -177,7 +178,7 @@ impl Executor for Tokio1Executor {
|
||||
#[cfg(all(feature = "smtp-transport", feature = "tokio1"))]
|
||||
#[async_trait]
|
||||
impl SpawnHandle for tokio1_crate::task::JoinHandle<()> {
|
||||
async fn shutdown(self) {
|
||||
async fn shutdown(&self) {
|
||||
self.abort();
|
||||
}
|
||||
}
|
||||
@@ -201,7 +202,7 @@ pub struct AsyncStd1Executor;
|
||||
#[cfg(feature = "async-std1")]
|
||||
impl Executor for AsyncStd1Executor {
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
type Handle = async_std::task::JoinHandle<()>;
|
||||
type Handle = futures_util::future::AbortHandle;
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
type Sleep = BoxFuture<'static, ()>;
|
||||
|
||||
@@ -211,7 +212,9 @@ impl Executor for AsyncStd1Executor {
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
async_std::task::spawn(fut)
|
||||
let (handle, registration) = futures_util::future::AbortHandle::new_pair();
|
||||
async_std::task::spawn(futures_util::future::Abortable::new(fut, registration));
|
||||
handle
|
||||
}
|
||||
|
||||
#[cfg(feature = "smtp-transport")]
|
||||
@@ -272,9 +275,9 @@ impl Executor for AsyncStd1Executor {
|
||||
|
||||
#[cfg(all(feature = "smtp-transport", feature = "async-std1"))]
|
||||
#[async_trait]
|
||||
impl SpawnHandle for async_std::task::JoinHandle<()> {
|
||||
async fn shutdown(self) {
|
||||
self.cancel().await;
|
||||
impl SpawnHandle for futures_util::future::AbortHandle {
|
||||
async fn shutdown(&self) {
|
||||
self.abort();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,5 +294,5 @@ mod private {
|
||||
impl Sealed for tokio1_crate::task::JoinHandle<()> {}
|
||||
|
||||
#[cfg(all(feature = "smtp-transport", feature = "async-std1"))]
|
||||
impl Sealed for async_std::task::JoinHandle<()> {}
|
||||
impl Sealed for futures_util::future::AbortHandle {}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! * Secure defaults
|
||||
//! * Async support
|
||||
//!
|
||||
//! Lettre requires Rust 1.71 or newer.
|
||||
//! Lettre requires Rust 1.74 or newer.
|
||||
//!
|
||||
//! ## Features
|
||||
//!
|
||||
@@ -158,11 +158,12 @@
|
||||
//! [mime 0.3]: https://docs.rs/mime/0.3
|
||||
//! [DKIM]: https://datatracker.ietf.org/doc/html/rfc6376
|
||||
|
||||
#![doc(html_root_url = "https://docs.rs/crate/lettre/0.11.14")]
|
||||
#![doc(html_root_url = "https://docs.rs/crate/lettre/0.11.15")]
|
||||
#![doc(html_favicon_url = "https://lettre.rs/favicon.ico")]
|
||||
#![doc(html_logo_url = "https://avatars0.githubusercontent.com/u/15113230?v=4")]
|
||||
#![forbid(unsafe_code)]
|
||||
#![deny(
|
||||
unreachable_pub,
|
||||
missing_copy_implementations,
|
||||
trivial_casts,
|
||||
trivial_numeric_casts,
|
||||
|
||||
@@ -41,14 +41,14 @@ fn quoted_pair() -> impl Parser<char, char, Error = Cheap<char>> {
|
||||
|
||||
// FWS = ([*WSP CRLF] 1*WSP) / ; Folding white space
|
||||
// obs-FWS
|
||||
pub fn fws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
|
||||
pub(super) fn fws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
|
||||
rfc2234::wsp()
|
||||
.or_not()
|
||||
.then_ignore(rfc2234::wsp().ignored().repeated())
|
||||
}
|
||||
|
||||
// CFWS = *([FWS] comment) (([FWS] comment) / FWS)
|
||||
pub fn cfws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
|
||||
pub(super) fn cfws() -> impl Parser<char, Option<char>, Error = Cheap<char>> {
|
||||
// TODO: comment are not currently supported, so for now a cfws is
|
||||
// the same as a fws.
|
||||
fws()
|
||||
@@ -106,12 +106,12 @@ pub(super) fn atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
}
|
||||
|
||||
// dot-atom = [CFWS] dot-atom-text [CFWS]
|
||||
pub fn dot_atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn dot_atom() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
cfws().chain(dot_atom_text())
|
||||
}
|
||||
|
||||
// dot-atom-text = 1*atext *("." 1*atext)
|
||||
pub fn dot_atom_text() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn dot_atom_text() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
atext().repeated().at_least(1).chain(
|
||||
just('.')
|
||||
.chain(atext().repeated().at_least(1))
|
||||
@@ -204,7 +204,7 @@ pub(crate) fn mailbox_list(
|
||||
// https://datatracker.ietf.org/doc/html/rfc2822#section-3.4.1
|
||||
|
||||
// addr-spec = local-part "@" domain
|
||||
pub fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
|
||||
pub(super) fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
|
||||
local_part()
|
||||
.collect()
|
||||
.then_ignore(just('@'))
|
||||
@@ -212,12 +212,12 @@ pub fn addr_spec() -> impl Parser<char, (String, String), Error = Cheap<char>> {
|
||||
}
|
||||
|
||||
// local-part = dot-atom / quoted-string / obs-local-part
|
||||
pub fn local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
choice((dot_atom(), quoted_string(), obs_local_part()))
|
||||
}
|
||||
|
||||
// domain = dot-atom / domain-literal / obs-domain
|
||||
pub fn domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
// NOTE: omitting domain-literal since it may never be used
|
||||
choice((dot_atom(), obs_domain()))
|
||||
}
|
||||
@@ -240,11 +240,11 @@ fn obs_phrase() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
// https://datatracker.ietf.org/doc/html/rfc2822#section-4.4
|
||||
|
||||
// obs-local-part = word *("." word)
|
||||
pub fn obs_local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn obs_local_part() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
word().chain(just('.').chain(word()).repeated().flatten())
|
||||
}
|
||||
|
||||
// obs-domain = atom *("." atom)
|
||||
pub fn obs_domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
pub(super) fn obs_domain() -> impl Parser<char, Vec<char>, Error = Cheap<char>> {
|
||||
atom().chain(just('.').chain(atom()).repeated().flatten())
|
||||
}
|
||||
|
||||
@@ -140,6 +140,10 @@ pub trait Transport {
|
||||
}
|
||||
|
||||
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error>;
|
||||
|
||||
/// Shuts down the transport. Future calls to [`Self::send`] and
|
||||
/// [`Self::send_raw`] might fail.
|
||||
fn shutdown(&self) {}
|
||||
}
|
||||
|
||||
/// Async Transport method for emails
|
||||
@@ -166,4 +170,8 @@ pub trait AsyncTransport {
|
||||
}
|
||||
|
||||
async fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error>;
|
||||
|
||||
/// Shuts down the transport. Future calls to [`Self::send`] and
|
||||
/// [`Self::send_raw`] might fail.
|
||||
async fn shutdown(&self) {}
|
||||
}
|
||||
|
||||
@@ -79,6 +79,11 @@ impl AsyncTransport for AsyncSmtpTransport<Tokio1Executor> {
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn shutdown(&self) {
|
||||
#[cfg(feature = "pool")]
|
||||
self.inner.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-std1")]
|
||||
@@ -97,6 +102,11 @@ impl AsyncTransport for AsyncSmtpTransport<AsyncStd1Executor> {
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn shutdown(&self) {
|
||||
#[cfg(feature = "pool")]
|
||||
self.inner.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> AsyncSmtpTransport<E>
|
||||
@@ -460,7 +470,7 @@ impl AsyncSmtpTransportBuilder {
|
||||
}
|
||||
|
||||
/// Build client
|
||||
pub struct AsyncSmtpClient<E> {
|
||||
pub(super) struct AsyncSmtpClient<E> {
|
||||
info: SmtpInfo,
|
||||
marker_: PhantomData<E>,
|
||||
}
|
||||
@@ -472,7 +482,7 @@ where
|
||||
/// Creates a new connection directly usable to send emails
|
||||
///
|
||||
/// Handles encryption and authentication
|
||||
pub async fn connection(&self) -> Result<AsyncSmtpConnection, Error> {
|
||||
pub(super) async fn connection(&self) -> Result<AsyncSmtpConnection, Error> {
|
||||
let mut conn = E::connect(
|
||||
&self.info.server,
|
||||
self.info.port,
|
||||
|
||||
@@ -58,7 +58,7 @@ struct ClientCodec {
|
||||
|
||||
impl ClientCodec {
|
||||
/// Creates a new client codec
|
||||
pub fn new() -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
status: CodecStatus::StartOfNewLine,
|
||||
}
|
||||
|
||||
@@ -490,7 +490,7 @@ impl TlsParametersBuilder {
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum InnerTlsParameters {
|
||||
pub(crate) enum InnerTlsParameters {
|
||||
#[cfg(feature = "native-tls")]
|
||||
NativeTls(TlsConnector),
|
||||
#[cfg(feature = "rustls")]
|
||||
|
||||
@@ -77,6 +77,11 @@ impl Error {
|
||||
matches!(self.inner.kind, Kind::Tls)
|
||||
}
|
||||
|
||||
/// Returns true if the error is because the transport was shut down
|
||||
pub fn is_transport_shutdown(&self) -> bool {
|
||||
matches!(self.inner.kind, Kind::TransportShutdown)
|
||||
}
|
||||
|
||||
/// Returns the status code, if the error was generated from a response.
|
||||
pub fn status(&self) -> Option<Code> {
|
||||
match self.inner.kind {
|
||||
@@ -111,6 +116,8 @@ pub(crate) enum Kind {
|
||||
)]
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]
|
||||
Tls,
|
||||
/// Transport shutdown error
|
||||
TransportShutdown,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Error {
|
||||
@@ -136,6 +143,7 @@ impl fmt::Display for Error {
|
||||
Kind::Connection => f.write_str("Connection error")?,
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]
|
||||
Kind::Tls => f.write_str("tls error")?,
|
||||
Kind::TransportShutdown => f.write_str("transport has been shut down")?,
|
||||
Kind::Transient(code) => {
|
||||
write!(f, "transient error ({code})")?;
|
||||
}
|
||||
@@ -189,3 +197,7 @@ pub(crate) fn connection<E: Into<BoxError>>(e: E) -> Error {
|
||||
pub(crate) fn tls<E: Into<BoxError>>(e: E) -> Error {
|
||||
Error::new(Kind::Tls, Some(e))
|
||||
}
|
||||
|
||||
pub(crate) fn transport_shutdown() -> Error {
|
||||
Error::new::<BoxError>(Kind::TransportShutdown, None)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{
|
||||
fmt::{self, Debug},
|
||||
mem,
|
||||
ops::{Deref, DerefMut},
|
||||
sync::{Arc, OnceLock},
|
||||
time::{Duration, Instant},
|
||||
@@ -15,11 +14,15 @@ use super::{
|
||||
super::{client::AsyncSmtpConnection, Error},
|
||||
PoolConfig,
|
||||
};
|
||||
use crate::{executor::SpawnHandle, transport::smtp::async_transport::AsyncSmtpClient, Executor};
|
||||
use crate::{
|
||||
executor::SpawnHandle,
|
||||
transport::smtp::{async_transport::AsyncSmtpClient, error},
|
||||
Executor,
|
||||
};
|
||||
|
||||
pub struct Pool<E: Executor> {
|
||||
pub(crate) struct Pool<E: Executor> {
|
||||
config: PoolConfig,
|
||||
connections: Mutex<Vec<ParkedConnection>>,
|
||||
connections: Mutex<Option<Vec<ParkedConnection>>>,
|
||||
client: AsyncSmtpClient<E>,
|
||||
handle: OnceLock<E::Handle>,
|
||||
}
|
||||
@@ -29,16 +32,16 @@ struct ParkedConnection {
|
||||
since: Instant,
|
||||
}
|
||||
|
||||
pub struct PooledConnection<E: Executor> {
|
||||
pub(crate) struct PooledConnection<E: Executor> {
|
||||
conn: Option<AsyncSmtpConnection>,
|
||||
pool: Arc<Pool<E>>,
|
||||
}
|
||||
|
||||
impl<E: Executor> Pool<E> {
|
||||
pub fn new(config: PoolConfig, client: AsyncSmtpClient<E>) -> Arc<Self> {
|
||||
pub(crate) fn new(config: PoolConfig, client: AsyncSmtpClient<E>) -> Arc<Self> {
|
||||
let pool = Arc::new(Self {
|
||||
config,
|
||||
connections: Mutex::new(Vec::new()),
|
||||
connections: Mutex::new(Some(Vec::new())),
|
||||
client,
|
||||
handle: OnceLock::new(),
|
||||
});
|
||||
@@ -60,6 +63,10 @@ impl<E: Executor> Pool<E> {
|
||||
#[allow(clippy::needless_collect)]
|
||||
let (count, dropped) = {
|
||||
let mut connections = pool.connections.lock().await;
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return;
|
||||
};
|
||||
|
||||
let to_drop = connections
|
||||
.iter()
|
||||
@@ -92,6 +99,11 @@ impl<E: Executor> Pool<E> {
|
||||
};
|
||||
|
||||
let mut connections = pool.connections.lock().await;
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return;
|
||||
};
|
||||
|
||||
connections.push(ParkedConnection::park(conn));
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
@@ -134,10 +146,29 @@ impl<E: Executor> Pool<E> {
|
||||
pool
|
||||
}
|
||||
|
||||
pub async fn connection(self: &Arc<Self>) -> Result<PooledConnection<E>, Error> {
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
let connections = { self.connections.lock().await.take() };
|
||||
if let Some(connections) = connections {
|
||||
stream::iter(connections)
|
||||
.for_each_concurrent(8, |conn| async move {
|
||||
conn.unpark().abort().await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Some(handle) = self.handle.get() {
|
||||
handle.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn connection(self: &Arc<Self>) -> Result<PooledConnection<E>, Error> {
|
||||
loop {
|
||||
let conn = {
|
||||
let mut connections = self.connections.lock().await;
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return Err(error::transport_shutdown());
|
||||
};
|
||||
connections.pop()
|
||||
};
|
||||
|
||||
@@ -181,13 +212,20 @@ impl<E: Executor> Pool<E> {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::debug!("recycling connection");
|
||||
|
||||
let mut connections = self.connections.lock().await;
|
||||
if connections.len() >= self.config.max_size as usize {
|
||||
drop(connections);
|
||||
conn.abort().await;
|
||||
let mut connections_guard = self.connections.lock().await;
|
||||
|
||||
if let Some(connections) = connections_guard.as_mut() {
|
||||
if connections.len() >= self.config.max_size as usize {
|
||||
drop(connections_guard);
|
||||
conn.abort().await;
|
||||
} else {
|
||||
let conn = ParkedConnection::park(conn);
|
||||
connections.push(conn);
|
||||
}
|
||||
} else {
|
||||
let conn = ParkedConnection::park(conn);
|
||||
connections.push(conn);
|
||||
// The pool has already been shut down
|
||||
drop(connections_guard);
|
||||
conn.abort().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,7 +238,13 @@ impl<E: Executor> Debug for Pool<E> {
|
||||
.field(
|
||||
"connections",
|
||||
&match self.connections.try_lock() {
|
||||
Some(connections) => format!("{} connections", connections.len()),
|
||||
Some(connections) => {
|
||||
if let Some(connections) = connections.as_ref() {
|
||||
format!("{} connections", connections.len())
|
||||
} else {
|
||||
"SHUT DOWN".to_owned()
|
||||
}
|
||||
}
|
||||
|
||||
None => "LOCKED".to_owned(),
|
||||
},
|
||||
@@ -222,14 +266,16 @@ impl<E: Executor> Drop for Pool<E> {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::debug!("dropping Pool");
|
||||
|
||||
let connections = mem::take(self.connections.get_mut());
|
||||
let connections = self.connections.get_mut().take();
|
||||
let handle = self.handle.take();
|
||||
E::spawn(async move {
|
||||
if let Some(handle) = handle {
|
||||
handle.shutdown().await;
|
||||
}
|
||||
|
||||
abort_concurrent(connections.into_iter().map(ParkedConnection::unpark)).await;
|
||||
if let Some(connections) = connections {
|
||||
abort_concurrent(connections.into_iter().map(ParkedConnection::unpark)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(any(feature = "tokio1", feature = "async-std1"))]
|
||||
pub mod async_impl;
|
||||
pub mod sync_impl;
|
||||
pub(super) mod async_impl;
|
||||
pub(super) mod sync_impl;
|
||||
|
||||
/// Configuration for a connection pool
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use std::{
|
||||
fmt::{self, Debug},
|
||||
mem,
|
||||
ops::{Deref, DerefMut},
|
||||
sync::{Arc, Mutex, TryLockError},
|
||||
sync::{mpsc, Arc, Mutex, TryLockError},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -11,11 +10,12 @@ use super::{
|
||||
super::{client::SmtpConnection, Error},
|
||||
PoolConfig,
|
||||
};
|
||||
use crate::transport::smtp::transport::SmtpClient;
|
||||
use crate::transport::smtp::{error, transport::SmtpClient};
|
||||
|
||||
pub struct Pool {
|
||||
pub(crate) struct Pool {
|
||||
config: PoolConfig,
|
||||
connections: Mutex<Vec<ParkedConnection>>,
|
||||
connections: Mutex<Option<Vec<ParkedConnection>>>,
|
||||
thread_terminator: mpsc::SyncSender<()>,
|
||||
client: SmtpClient,
|
||||
}
|
||||
|
||||
@@ -24,16 +24,19 @@ struct ParkedConnection {
|
||||
since: Instant,
|
||||
}
|
||||
|
||||
pub struct PooledConnection {
|
||||
pub(crate) struct PooledConnection {
|
||||
conn: Option<SmtpConnection>,
|
||||
pool: Arc<Pool>,
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
pub fn new(config: PoolConfig, client: SmtpClient) -> Arc<Self> {
|
||||
pub(crate) fn new(config: PoolConfig, client: SmtpClient) -> Arc<Self> {
|
||||
let (thread_tx, thread_rx) = mpsc::sync_channel(1);
|
||||
|
||||
let pool = Arc::new(Self {
|
||||
config,
|
||||
connections: Mutex::new(Vec::new()),
|
||||
connections: Mutex::new(Some(Vec::new())),
|
||||
thread_terminator: thread_tx,
|
||||
client,
|
||||
});
|
||||
|
||||
@@ -54,6 +57,10 @@ impl Pool {
|
||||
#[allow(clippy::needless_collect)]
|
||||
let (count, dropped) = {
|
||||
let mut connections = pool.connections.lock().unwrap();
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return;
|
||||
};
|
||||
|
||||
let to_drop = connections
|
||||
.iter()
|
||||
@@ -86,6 +93,11 @@ impl Pool {
|
||||
};
|
||||
|
||||
let mut connections = pool.connections.lock().unwrap();
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return;
|
||||
};
|
||||
|
||||
connections.push(ParkedConnection::park(conn));
|
||||
|
||||
#[cfg(feature = "tracing")]
|
||||
@@ -110,7 +122,14 @@ impl Pool {
|
||||
}
|
||||
|
||||
drop(pool);
|
||||
thread::sleep(idle_timeout);
|
||||
|
||||
match thread_rx.recv_timeout(idle_timeout) {
|
||||
Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => {
|
||||
// The transport was shut down
|
||||
return;
|
||||
}
|
||||
Err(mpsc::RecvTimeoutError::Timeout) => {}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("couldn't spawn the Pool thread");
|
||||
@@ -119,10 +138,25 @@ impl Pool {
|
||||
pool
|
||||
}
|
||||
|
||||
pub fn connection(self: &Arc<Self>) -> Result<PooledConnection, Error> {
|
||||
pub(crate) fn shutdown(&self) {
|
||||
let connections = { self.connections.lock().unwrap().take() };
|
||||
if let Some(connections) = connections {
|
||||
for conn in connections {
|
||||
conn.unpark().abort();
|
||||
}
|
||||
}
|
||||
|
||||
_ = self.thread_terminator.try_send(());
|
||||
}
|
||||
|
||||
pub(crate) fn connection(self: &Arc<Self>) -> Result<PooledConnection, Error> {
|
||||
loop {
|
||||
let conn = {
|
||||
let mut connections = self.connections.lock().unwrap();
|
||||
let Some(connections) = connections.as_mut() else {
|
||||
// The transport was shut down
|
||||
return Err(error::transport_shutdown());
|
||||
};
|
||||
connections.pop()
|
||||
};
|
||||
|
||||
@@ -166,13 +200,20 @@ impl Pool {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::debug!("recycling connection");
|
||||
|
||||
let mut connections = self.connections.lock().unwrap();
|
||||
if connections.len() >= self.config.max_size as usize {
|
||||
drop(connections);
|
||||
conn.abort();
|
||||
let mut connections_guard = self.connections.lock().unwrap();
|
||||
|
||||
if let Some(connections) = connections_guard.as_mut() {
|
||||
if connections.len() >= self.config.max_size as usize {
|
||||
drop(connections_guard);
|
||||
conn.abort();
|
||||
} else {
|
||||
let conn = ParkedConnection::park(conn);
|
||||
connections.push(conn);
|
||||
}
|
||||
} else {
|
||||
let conn = ParkedConnection::park(conn);
|
||||
connections.push(conn);
|
||||
// The pool has already been shut down
|
||||
drop(connections_guard);
|
||||
conn.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -185,7 +226,13 @@ impl Debug for Pool {
|
||||
.field(
|
||||
"connections",
|
||||
&match self.connections.try_lock() {
|
||||
Ok(connections) => format!("{} connections", connections.len()),
|
||||
Ok(connections) => {
|
||||
if let Some(connections) = connections.as_ref() {
|
||||
format!("{} connections", connections.len())
|
||||
} else {
|
||||
"SHUT DOWN".to_owned()
|
||||
}
|
||||
}
|
||||
|
||||
Err(TryLockError::WouldBlock) => "LOCKED".to_owned(),
|
||||
Err(TryLockError::Poisoned(_)) => "POISONED".to_owned(),
|
||||
@@ -201,10 +248,11 @@ impl Drop for Pool {
|
||||
#[cfg(feature = "tracing")]
|
||||
tracing::debug!("dropping Pool");
|
||||
|
||||
let connections = mem::take(&mut *self.connections.get_mut().unwrap());
|
||||
for conn in connections {
|
||||
let mut conn = conn.unpark();
|
||||
conn.abort();
|
||||
if let Some(connections) = self.connections.get_mut().unwrap().take() {
|
||||
for conn in connections {
|
||||
let mut conn = conn.unpark();
|
||||
conn.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,11 @@ impl Transport for SmtpTransport {
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn shutdown(&self) {
|
||||
#[cfg(feature = "pool")]
|
||||
self.inner.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for SmtpTransport {
|
||||
@@ -369,7 +374,7 @@ impl SmtpTransportBuilder {
|
||||
|
||||
/// Build client
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SmtpClient {
|
||||
pub(super) struct SmtpClient {
|
||||
info: SmtpInfo,
|
||||
}
|
||||
|
||||
@@ -377,7 +382,7 @@ impl SmtpClient {
|
||||
/// Creates a new connection directly usable to send emails
|
||||
///
|
||||
/// Handles encryption and authentication
|
||||
pub fn connection(&self) -> Result<SmtpConnection, Error> {
|
||||
pub(super) fn connection(&self) -> Result<SmtpConnection, Error> {
|
||||
#[allow(clippy::match_single_binding)]
|
||||
let tls_parameters = match &self.info.tls {
|
||||
#[cfg(any(feature = "native-tls", feature = "rustls", feature = "boring-tls"))]
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::fmt::{Display, Formatter, Result as FmtResult};
|
||||
|
||||
/// Encode a string as xtext
|
||||
#[derive(Debug)]
|
||||
pub struct XText<'a>(pub &'a str);
|
||||
pub(crate) struct XText<'a>(pub(crate) &'a str);
|
||||
|
||||
impl Display for XText<'_> {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||
|
||||
Reference in New Issue
Block a user