Merge pull request #424 from amousset/async

Start implementing async transport
This commit is contained in:
Alexis Mousset
2020-05-10 16:21:13 +02:00
committed by GitHub
13 changed files with 272 additions and 53 deletions

View File

@@ -34,6 +34,11 @@ jobs:
with:
command: test
args: --no-default-features --features=builder,smtp-transport,file-transport,sendmail-transport
- run: rm target/debug/deps/liblettre-*
- uses: actions-rs/cargo@v1
with:
command: test
args: --features=async
check:
name: Check

View File

@@ -17,6 +17,9 @@ is-it-maintained-open-issues = { repository = "lettre/lettre" }
maintenance = { status = "actively-developed" }
[dependencies]
async-attributes = { version = "1.1", optional = true }
async-std = { version = "1.5", optional = true, features = ["unstable"] }
async-trait = { version = "0.1", optional = true }
base64 = { version = "0.12", optional = true }
bufstream = { version = "0.1", optional = true }
hostname = { version = "0.3", optional = true }
@@ -50,6 +53,7 @@ harness = false
name = "transport_smtp"
[features]
async = ["async-std", "async-trait", "async-attributes"]
builder = ["mime", "base64", "hyperx", "textnonce", "quoted_printable"]
default = ["file-transport", "smtp-transport", "rustls-tls", "hostname", "r2d2", "sendmail-transport", "builder"]
file-transport = ["serde", "serde_json"]

View File

@@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use lettre::{Message, SmtpTransport, Transport};
fn bench_simple_send(c: &mut Criterion) {
let sender = SmtpTransport::new("127.0.0.1").port(2525);
let sender = SmtpTransport::builder("127.0.0.1").port(2525).build();
c.bench_function("send email", move |b| {
b.iter(|| {
@@ -20,7 +20,7 @@ fn bench_simple_send(c: &mut Criterion) {
}
fn bench_reuse_send(c: &mut Criterion) {
let sender = SmtpTransport::new("127.0.0.1").port(2525);
let sender = SmtpTransport::builder("127.0.0.1").port(2525).build();
c.bench_function("send email with connection reuse", move |b| {
b.iter(|| {
let email = Message::builder()

View File

@@ -151,6 +151,35 @@ pub trait Transport {
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error>;
}
#[cfg(feature = "async")]
pub mod r#async {
use super::*;
use async_trait::async_trait;
#[async_trait]
pub trait Transport {
/// Result types for the transport
type Ok: fmt::Debug;
type Error: StdError;
/// Sends the email
#[cfg(feature = "builder")]
// TODO take &Message
async fn send(&self, message: Message) -> Result<Self::Ok, Self::Error> {
let raw = message.formatted();
let envelope = message.envelope();
self.send_raw(&envelope, &raw).await
}
async fn send_raw(
&self,
envelope: &Envelope,
email: &[u8],
) -> Result<Self::Ok, Self::Error>;
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -62,6 +62,12 @@ impl SinglePartBuilder {
self
}
/// Set the Content-Type header of the singlepart
pub fn content_type(mut self, content_type: ContentType) -> Self {
self.headers.set(content_type);
self
}
/// Build singlepart using body
pub fn body<T: Into<Vec<u8>>>(self, body: T) -> SinglePart {
SinglePart {
@@ -81,11 +87,8 @@ impl Default for SinglePartBuilder {
///
/// # Example
///
/// ```no_test
/// extern crate mime;
/// extern crate emailmessage;
///
/// use emailmessage::{SinglePart, header};
/// ```
/// use lettre::message::{SinglePart, header};
///
/// let part = SinglePart::builder()
/// .header(header::ContentType("text/plain; charset=utf8".parse().unwrap()))

View File

@@ -78,9 +78,7 @@ impl Transport for FileTransport {
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error> {
let email_id = Uuid::new_v4();
let mut file = self.path.clone();
file.push(format!("{}.json", email_id));
let file = self.path.join(format!("{}.json", email_id));
let serialized = match str::from_utf8(email) {
// Serialize as UTF-8 string if possible
@@ -100,3 +98,47 @@ impl Transport for FileTransport {
Ok(email_id.to_string())
}
}
#[cfg(feature = "async")]
pub mod r#async {
use super::{FileTransport, Id, SerializableEmail};
use crate::{r#async::Transport, transport::file::error::Error, Envelope};
use async_std::fs::File;
use async_std::prelude::*;
use async_trait::async_trait;
use std::str;
use uuid::Uuid;
#[async_trait]
impl Transport for FileTransport {
type Ok = Id;
type Error = Error;
async fn send_raw(
&self,
envelope: &Envelope,
email: &[u8],
) -> Result<Self::Ok, Self::Error> {
let email_id = Uuid::new_v4();
let file = self.path.join(format!("{}.json", email_id));
let serialized = match str::from_utf8(email) {
// Serialize as UTF-8 string if possible
Ok(m) => serde_json::to_string(&SerializableEmail {
envelope: envelope.clone(),
message: Some(m),
raw_message: None,
}),
Err(_) => serde_json::to_string(&SerializableEmail {
envelope: envelope.clone(),
message: None,
raw_message: Some(email),
}),
}?;
let mut file = File::create(file.as_path()).await?;
file.write_all(serialized.as_bytes()).await?;
Ok(email_id.to_string())
}
}
}

View File

@@ -56,6 +56,17 @@ impl SendmailTransport {
command: command.into(),
}
}
fn command(&self, envelope: &Envelope) -> Command {
let mut c = Command::new(&self.command);
c.arg("-i")
.arg("-f")
.arg(envelope.from().map(|f| f.as_ref()).unwrap_or("\"\""))
.args(envelope.to())
.stdin(Stdio::piped())
.stdout(Stdio::piped());
c
}
}
impl Transport for SendmailTransport {
@@ -64,14 +75,7 @@ impl Transport for SendmailTransport {
fn send_raw(&self, envelope: &Envelope, email: &[u8]) -> Result<Self::Ok, Self::Error> {
// Spawn the sendmail command
let mut process = Command::new(&self.command)
.arg("-i")
.arg("-f")
.arg(envelope.from().map(|f| f.as_ref()).unwrap_or("\"\""))
.args(envelope.to())
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let mut process = self.command(envelope).spawn()?;
process.stdin.as_mut().unwrap().write_all(email)?;
let output = process.wait_with_output()?;
@@ -83,3 +87,42 @@ impl Transport for SendmailTransport {
}
}
}
#[cfg(feature = "async")]
pub mod r#async {
use super::SendmailTransport;
use crate::{r#async::Transport, transport::sendmail::error::Error, Envelope};
use async_trait::async_trait;
use std::io::Write;
#[async_trait]
impl Transport for SendmailTransport {
type Ok = ();
type Error = Error;
// TODO: Convert to real async, once async-std has a process implementation.
async fn send_raw(
&self,
envelope: &Envelope,
email: &[u8],
) -> Result<Self::Ok, Self::Error> {
let mut command = self.command(envelope);
let email = email.to_vec();
let output = async_std::task::spawn_blocking(move || {
// Spawn the sendmail command
let mut process = command.spawn()?;
process.stdin.as_mut().unwrap().write_all(&email)?;
process.wait_with_output()
})
.await?;
if output.status.success() {
Ok(())
} else {
Err(Error::Client(String::from_utf8(output.stderr)?))
}
}
}
}

View File

@@ -7,7 +7,7 @@ use crate::{
commands::*,
error::Error,
extension::{ClientId, Extension, MailBodyParameter, MailParameter, ServerInfo},
response::Response,
response::{parse_response, Response},
},
Envelope,
};
@@ -312,36 +312,30 @@ impl SmtpConnection {
/// Gets the SMTP response
pub fn read_response(&mut self) -> Result<Response, Error> {
let mut raw_response = String::new();
let mut response = raw_response.parse::<Response>();
let mut buffer = String::with_capacity(100);
while response.is_err() {
if let Error::Parsing(nom::error::ErrorKind::Complete) =
response.as_ref().err().unwrap()
{
break;
while self.stream.read_line(&mut buffer)? > 0 {
#[cfg(feature = "log")]
debug!("<< {}", escape_crlf(&buffer));
match parse_response(&buffer) {
Ok((_remaining, response)) => {
if response.is_positive() {
return Ok(response);
}
return Err(response.into());
}
Err(nom::Err::Failure(e)) => {
return Err(Error::Parsing(e.1));
}
Err(nom::Err::Incomplete(_)) => { /* read more */ }
Err(nom::Err::Error(e)) => {
return Err(Error::Parsing(e.1));
}
}
// TODO read more than one line
let read_count = self.stream.read_line(&mut raw_response)?;
// EOF is reached
if read_count == 0 {
break;
}
response = raw_response.parse::<Response>();
}
#[cfg(feature = "log")]
debug!("Read: {}", escape_crlf(raw_response.as_ref()));
let final_response = response?;
if final_response.is_positive() {
Ok(final_response)
} else {
Err(From::from(final_response))
}
Err(io::Error::new(io::ErrorKind::Other, "incomplete").into())
}
}

View File

@@ -4,7 +4,7 @@
use crate::transport::smtp::Error;
use nom::{
branch::alt,
bytes::complete::{tag, take_until},
bytes::streaming::{tag, take_until},
combinator::{complete, map},
multi::many0,
sequence::{preceded, tuple},
@@ -226,7 +226,7 @@ fn parse_detail(i: &str) -> IResult<&str, Detail> {
))(i)
}
fn parse_response(i: &str) -> IResult<&str, Response> {
pub(crate) fn parse_response(i: &str) -> IResult<&str, Response> {
let (i, lines) = many0(tuple((
parse_code,
preceded(tag("-"), take_until("\r\n")),
@@ -262,7 +262,7 @@ fn parse_response(i: &str) -> IResult<&str, Response> {
#[cfg(test)]
mod test {
use super::{Category, Code, Detail, Response, Severity};
use super::*;
#[test]
fn test_severity_fmt() {
@@ -472,6 +472,16 @@ mod test {
);
}
#[test]
fn test_response_incomplete() {
let raw_response = "250-smtp.example.org\r\n";
let res = parse_response(raw_response);
match res {
Err(nom::Err::Incomplete(_)) => {}
_ => panic!("Expected incomplete response, got {:?}", res),
}
}
#[test]
fn test_response_first_line() {
assert_eq!(

View File

@@ -73,3 +73,24 @@ impl Transport for StubTransport {
self.response
}
}
#[cfg(feature = "async")]
pub mod r#async {
use super::StubTransport;
use crate::{r#async::Transport, transport::stub::Error, Envelope};
use async_trait::async_trait;
#[async_trait]
impl Transport for StubTransport {
type Ok = ();
type Error = Error;
async fn send_raw(
&self,
_envelope: &Envelope,
_email: &[u8],
) -> Result<Self::Ok, Self::Error> {
self.response
}
}
}

View File

@@ -1,7 +1,7 @@
#[cfg(test)]
#[cfg(feature = "file-transport")]
mod test {
use lettre::{transport::file::FileTransport, Message, Transport};
use lettre::{transport::file::FileTransport, Message};
use std::{
env::temp_dir,
fs::{remove_file, File},
@@ -10,6 +10,7 @@ mod test {
#[test]
fn file_transport() {
use lettre::Transport;
let sender = FileTransport::new(temp_dir());
let email = Message::builder()
.from("NoBody <nobody@domain.tld>".parse().unwrap())
@@ -33,4 +34,32 @@ mod test {
"{\"envelope\":{\"forward_path\":[\"hei@domain.tld\"],\"reverse_path\":\"nobody@domain.tld\"},\"raw_message\":null,\"message\":\"From: NoBody <nobody@domain.tld>\\r\\nReply-To: Yuin <yuin@domain.tld>\\r\\nTo: Hei <hei@domain.tld>\\r\\nSubject: Happy new year\\r\\nDate: Tue, 15 Nov 1994 08:12:31 GMT\\r\\n\\r\\nBe happy!\"}");
remove_file(file).unwrap();
}
#[cfg(feature = "async")]
#[async_attributes::test]
async fn file_transport_async() {
use lettre::r#async::Transport;
let sender = FileTransport::new(temp_dir());
let email = Message::builder()
.from("NoBody <nobody@domain.tld>".parse().unwrap())
.reply_to("Yuin <yuin@domain.tld>".parse().unwrap())
.to("Hei <hei@domain.tld>".parse().unwrap())
.subject("Happy new year")
.date("Tue, 15 Nov 1994 08:12:31 GMT".parse().unwrap())
.body("Be happy!")
.unwrap();
let result = sender.send(email).await;
let id = result.unwrap();
let file = temp_dir().join(format!("{}.json", id));
let mut f = File::open(file.clone()).unwrap();
let mut buffer = String::new();
let _ = f.read_to_string(&mut buffer);
assert_eq!(
buffer,
"{\"envelope\":{\"forward_path\":[\"hei@domain.tld\"],\"reverse_path\":\"nobody@domain.tld\"},\"raw_message\":null,\"message\":\"From: NoBody <nobody@domain.tld>\\r\\nReply-To: Yuin <yuin@domain.tld>\\r\\nTo: Hei <hei@domain.tld>\\r\\nSubject: Happy new year\\r\\nDate: Tue, 15 Nov 1994 08:12:31 GMT\\r\\n\\r\\nBe happy!\"}");
remove_file(file).unwrap();
}
}

View File

@@ -1,10 +1,11 @@
#[cfg(test)]
#[cfg(feature = "sendmail-transport")]
mod test {
use lettre::{transport::sendmail::SendmailTransport, Message, Transport};
use lettre::{transport::sendmail::SendmailTransport, Message};
#[test]
fn sendmail_transport_simple() {
fn sendmail_transport() {
use lettre::Transport;
let sender = SendmailTransport::new();
let email = Message::builder()
.from("NoBody <nobody@domain.tld>".parse().unwrap())
@@ -18,4 +19,22 @@ mod test {
println!("{:?}", result);
assert!(result.is_ok());
}
#[cfg(feature = "async")]
#[async_attributes::test]
async fn sendmail_transport_async() {
use lettre::r#async::Transport;
let sender = SendmailTransport::new();
let email = Message::builder()
.from("NoBody <nobody@domain.tld>".parse().unwrap())
.reply_to("Yuin <yuin@domain.tld>".parse().unwrap())
.to("Hei <hei@domain.tld>".parse().unwrap())
.subject("Happy new year")
.date("Tue, 15 Nov 1994 08:12:31 GMT".parse().unwrap())
.body("Be happy!")
.unwrap();
let result = sender.send(email).await;
assert!(result.is_ok());
}
}

View File

@@ -1,7 +1,8 @@
use lettre::{transport::stub::StubTransport, Message, Transport};
use lettre::{transport::stub::StubTransport, Message};
#[test]
fn stub_transport() {
use lettre::Transport;
let sender_ok = StubTransport::new_ok();
let sender_ko = StubTransport::new_error();
let email = Message::builder()
@@ -12,6 +13,25 @@ fn stub_transport() {
.body("Be happy!")
.unwrap();
sender_ok.send(&email.clone()).unwrap();
sender_ok.send(&email).unwrap();
sender_ko.send(&email).unwrap_err();
}
#[cfg(feature = "async")]
#[async_attributes::test]
async fn stub_transport_async() {
use lettre::r#async::Transport;
let sender_ok = StubTransport::new_ok();
let sender_ko = StubTransport::new_error();
let email = Message::builder()
.from("NoBody <nobody@domain.tld>".parse().unwrap())
.reply_to("Yuin <yuin@domain.tld>".parse().unwrap())
.to("Hei <hei@domain.tld>".parse().unwrap())
.subject("Happy new year")
.date("Tue, 15 Nov 1994 08:12:31 GMT".parse().unwrap())
.body("Be happy!")
.unwrap();
sender_ok.send(email.clone()).await.unwrap();
sender_ko.send(email).await.unwrap_err();
}