split CancelToken into RawCancelToken for smaller sizes and better typesafety

This commit is contained in:
Conrad Ludgate
2025-06-10 06:21:55 -07:00
parent e38193c530
commit 6b528426e7
6 changed files with 28 additions and 29 deletions

View File

@@ -1,5 +1,3 @@
use std::io;
use tokio::net::TcpStream;
use crate::client::SocketConfig;
@@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect;
use crate::{Error, cancel_query_raw, connect_socket};
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
config: SocketConfig,
ssl_mode: SslMode,
tls: T,
process_id: i32,
@@ -17,16 +15,6 @@ pub(crate) async fn cancel_query<T>(
where
T: MakeTlsConnect<TcpStream>,
{
let config = match config {
Some(config) => config,
None => {
return Err(Error::connect(io::Error::new(
io::ErrorKind::InvalidInput,
"unknown host",
)));
}
};
let hostname = match &config.host {
Host::Tcp(host) => &**host,
};

View File

@@ -9,9 +9,16 @@ use crate::{Error, cancel_query, cancel_query_raw};
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone)]
pub struct CancelToken {
pub socket_config: Option<SocketConfig>,
pub socket_config: SocketConfig,
pub raw: RawCancelToken,
}
/// The capability to request cancellation of in-progress queries on a
/// connection.
#[derive(Clone, Serialize, Deserialize)]
pub struct RawCancelToken {
pub ssl_mode: SslMode,
pub process_id: i32,
pub secret_key: i32,
@@ -36,14 +43,16 @@ impl CancelToken {
{
cancel_query::cancel_query(
self.socket_config.clone(),
self.ssl_mode,
self.raw.ssl_mode,
tls,
self.process_id,
self.secret_key,
self.raw.process_id,
self.raw.secret_key,
)
.await
}
}
impl RawCancelToken {
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
/// connection itself.
pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>

View File

@@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::cancel_token::RawCancelToken;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::{Host, SslMode};
use crate::query::RowStream;
@@ -331,10 +332,12 @@ impl Client {
/// connection associated with this client.
pub fn cancel_token(&self) -> CancelToken {
CancelToken {
socket_config: Some(self.socket_config.clone()),
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
socket_config: self.socket_config.clone(),
raw: RawCancelToken {
ssl_mode: self.ssl_mode,
process_id: self.process_id,
secret_key: self.secret_key,
},
}
}

View File

@@ -3,7 +3,7 @@
use postgres_protocol2::message::backend::ReadyForQueryBody;
pub use crate::cancel_token::CancelToken;
pub use crate::cancel_token::{CancelToken, RawCancelToken};
pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
pub use crate::connect_raw::RawConnection;

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::{Context, anyhow};
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::CancelToken;
use postgres_client::RawCancelToken;
use postgres_client::tls::MakeTlsConnect;
use redis::{Cmd, FromRedisValue, Value};
use serde::{Deserialize, Serialize};
@@ -470,7 +470,7 @@ impl CancellationHandler {
#[derive(Clone, Serialize, Deserialize)]
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: CancelToken,
cancel_token: RawCancelToken,
hostname: String, // for pg_sni router
user_info: ComputeUserInfo,
}
@@ -478,7 +478,7 @@ pub struct CancelClosure {
impl CancelClosure {
pub(crate) fn new(
socket_addr: SocketAddr,
cancel_token: CancelToken,
cancel_token: RawCancelToken,
hostname: String,
user_info: ComputeUserInfo,
) -> Self {

View File

@@ -9,7 +9,7 @@ use itertools::Itertools;
use postgres_client::config::{AuthKeys, SslMode};
use postgres_client::maybe_tls_stream::MaybeTlsStream;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, NoTls, RawConnection};
use postgres_client::{NoTls, RawCancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use thiserror::Error;
use tokio::net::{TcpStream, lookup_host};
@@ -327,8 +327,7 @@ impl ConnectInfo {
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(
socket_addr,
CancelToken {
socket_config: None,
RawCancelToken {
ssl_mode: self.ssl_mode,
process_id,
secret_key,