diff --git a/libs/proxy/tokio-postgres2/src/cancel_query.rs b/libs/proxy/tokio-postgres2/src/cancel_query.rs index 4c2a5ef50f..94fbf333ed 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query.rs @@ -1,5 +1,3 @@ -use std::io; - use tokio::net::TcpStream; use crate::client::SocketConfig; @@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect; use crate::{Error, cancel_query_raw, connect_socket}; pub(crate) async fn cancel_query( - config: Option, + config: SocketConfig, ssl_mode: SslMode, tls: T, process_id: i32, @@ -17,16 +15,6 @@ pub(crate) async fn cancel_query( where T: MakeTlsConnect, { - let config = match config { - Some(config) => config, - None => { - return Err(Error::connect(io::Error::new( - io::ErrorKind::InvalidInput, - "unknown host", - ))); - } - }; - let hostname = match &config.host { Host::Tcp(host) => &**host, }; diff --git a/libs/proxy/tokio-postgres2/src/cancel_token.rs b/libs/proxy/tokio-postgres2/src/cancel_token.rs index f6526395ee..c5566b4ad9 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_token.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_token.rs @@ -7,11 +7,16 @@ use crate::config::SslMode; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Error, cancel_query, cancel_query_raw}; -/// The capability to request cancellation of in-progress queries on a -/// connection. -#[derive(Clone, Serialize, Deserialize)] +/// A cancellation token that allows easy cancellation of a query. +#[derive(Clone)] pub struct CancelToken { - pub socket_config: Option, + pub socket_config: SocketConfig, + pub raw: RawCancelToken, +} + +/// A raw cancellation token that allows cancellation of a query, given a fresh connection to postgres. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RawCancelToken { pub ssl_mode: SslMode, pub process_id: i32, pub secret_key: i32, @@ -36,14 +41,16 @@ impl CancelToken { { cancel_query::cancel_query( self.socket_config.clone(), - self.ssl_mode, + self.raw.ssl_mode, tls, - self.process_id, - self.secret_key, + self.raw.process_id, + self.raw.secret_key, ) .await } +} +impl RawCancelToken { /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new /// connection itself. pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index a7edfc076a..41b22e35b6 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use crate::cancel_token::RawCancelToken; use crate::codec::{BackendMessages, FrontendMessage}; use crate::config::{Host, SslMode}; use crate::query::RowStream; @@ -331,10 +332,12 @@ impl Client { /// connection associated with this client. pub fn cancel_token(&self) -> CancelToken { CancelToken { - socket_config: Some(self.socket_config.clone()), - ssl_mode: self.ssl_mode, - process_id: self.process_id, - secret_key: self.secret_key, + socket_config: self.socket_config.clone(), + raw: RawCancelToken { + ssl_mode: self.ssl_mode, + process_id: self.process_id, + secret_key: self.secret_key, + }, } } diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 9556070ed5..791c93b972 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -3,7 +3,7 @@ use postgres_protocol2::message::backend::ReadyForQueryBody; -pub use crate::cancel_token::CancelToken; +pub use crate::cancel_token::{CancelToken, RawCancelToken}; pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; pub use crate::connect_raw::RawConnection; diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs new file mode 100644 index 0000000000..61bdf2b747 --- /dev/null +++ b/proxy/src/batch.rs @@ -0,0 +1,146 @@ +//! Batch processing system based on intrusive linked lists. +//! +//! Enqueuing a batch job requires no allocations, with +//! direct support for cancelling jobs early. +use std::collections::BTreeMap; +use std::pin::pin; +use std::sync::Mutex; + +use futures::future::Either; +use scopeguard::ScopeGuard; +use tokio::sync::oneshot::error::TryRecvError; + +use crate::ext::LockExt; + +pub trait QueueProcessing: Send + 'static { + type Req: Send + 'static; + type Res: Send; + + /// Get the desired batch size. + fn batch_size(&self, queue_size: usize) -> usize; + + /// This applies a full batch of events. + /// Must respond with a full batch of replies. + /// + /// If this apply can error, it's expected that errors be forwarded to each Self::Res. + /// + /// Batching does not need to happen atomically. + fn apply(&mut self, req: Vec) -> impl Future> + Send; +} + +pub struct BatchQueue { + processor: tokio::sync::Mutex

, + inner: Mutex>, +} + +struct BatchJob { + req: P::Req, + res: tokio::sync::oneshot::Sender, +} + +impl BatchQueue

{ + pub fn new(p: P) -> Self { + Self { + processor: tokio::sync::Mutex::new(p), + inner: Mutex::new(BatchQueueInner { + version: 0, + queue: BTreeMap::new(), + }), + } + } + + pub async fn call(&self, req: P::Req) -> P::Res { + let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); + let guard = scopeguard::guard(id, move |id| { + let mut inner = self.inner.lock_propagate_poison(); + if inner.queue.remove(&id).is_some() { + tracing::debug!("batched task cancelled before completion"); + } + }); + + let resp = loop { + // try become the leader, or try wait for success. + let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await + { + // we got the resp. + Either::Left((resp, _)) => break resp.ok(), + // we are the leader. + Either::Right((p, rx_)) => { + rx = rx_; + p + } + }; + + let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor); + + // apply a batch. + let values = processor.apply(reqs).await; + + // send response values. + for (tx, value) in std::iter::zip(resps, values) { + // sender hung up but that's fine. + drop(tx.send(value)); + } + + match rx.try_recv() { + Ok(resp) => break Some(resp), + Err(TryRecvError::Closed) => break None, + // edge case - there was a race condition where + // we became the leader but were not in the batch. + // + // Example: + // thread 1: register job id=1 + // thread 2: register job id=2 + // thread 2: processor.lock().await + // thread 1: processor.lock().await + // thread 2: becomes leader, batch_size=1, jobs=[1]. + Err(TryRecvError::Empty) => {} + } + }; + + // already removed. + ScopeGuard::into_inner(guard); + + resp.expect("no response found. batch processer should not panic") + } +} + +struct BatchQueueInner { + version: u64, + queue: BTreeMap>, +} + +impl BatchQueueInner

{ + fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let id = self.version; + + // Overflow concern: + // This is a u64, and we might enqueue 2^16 tasks per second. + // This gives us 2^48 seconds (9 million years). + // Even if this does overflow, it will not break, but some + // jobs with the higher version might never get prioritised. + self.version += 1; + + self.queue.insert(id, BatchJob { req, res: tx }); + + (id, rx) + } + + fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { + let batch_size = p.batch_size(self.queue.len()); + let mut reqs = Vec::with_capacity(batch_size); + let mut resps = Vec::with_capacity(batch_size); + + while reqs.len() < batch_size { + let Some((_, job)) = self.queue.pop_first() else { + break; + }; + reqs.push(job.req); + resps.push(job.res); + } + + (reqs, resps) + } +} diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index ba10fce7b4..e3be454713 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -201,7 +201,7 @@ pub async fn run() -> anyhow::Result<()> { auth_backend, http_listener, shutdown.clone(), - Arc::new(CancellationHandler::new(&config.connect_to_compute, None)), + Arc::new(CancellationHandler::new(&config.connect_to_compute)), endpoint_rate_limiter, ); diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 6ab6df5610..9215dbf73f 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -23,7 +23,8 @@ use utils::{project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned}; -use crate::cancellation::{CancellationHandler, handle_cancel_messages}; +use crate::batch::BatchQueue; +use crate::cancellation::{CancellationHandler, CancellationProcessor}; use crate::config::{ self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, @@ -392,13 +393,7 @@ pub async fn run() -> anyhow::Result<()> { .as_ref() .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); - // channel size should be higher than redis client limit to avoid blocking - let cancel_ch_size = args.cancellation_ch_size; - let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); - let cancellation_handler = Arc::new(CancellationHandler::new( - &config.connect_to_compute, - Some(tx_cancel), - )); + let cancellation_handler = Arc::new(CancellationHandler::new(&config.connect_to_compute)); let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( RateBucketInfo::to_leaky_bucket(&args.endpoint_rps_limit) @@ -530,21 +525,11 @@ pub async fn run() -> anyhow::Result<()> { match redis_kv_client.try_connect().await { Ok(()) => { info!("Connected to Redis KV client"); - maintenance_tasks.spawn(async move { - handle_cancel_messages( - &mut redis_kv_client, - rx_cancel, - args.cancellation_batch_size, - ) - .await?; + cancellation_handler.init_tx(BatchQueue::new(CancellationProcessor { + client: redis_kv_client, + batch_size: args.cancellation_batch_size, + })); - drop(redis_kv_client); - - // `handle_cancel_messages` was terminated due to the tx_cancel - // being dropped. this is not worthy of an error, and this task can only return `Err`, - // so let's wait forever instead. - std::future::pending().await - }); break; } Err(e) => { diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index cce4c1d3a0..036f36c7f6 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,19 +1,23 @@ +use std::convert::Infallible; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; -use anyhow::{Context, anyhow}; +use anyhow::anyhow; +use futures::FutureExt; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::CancelToken; +use postgres_client::RawCancelToken; use postgres_client::tls::MakeTlsConnect; use redis::{Cmd, FromRedisValue, Value}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, info, warn}; +use tokio::time::timeout; +use tracing::{debug, error, info}; use crate::auth::AuthError; use crate::auth::backend::ComputeUserInfo; +use crate::batch::{BatchQueue, QueueProcessing}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; @@ -27,46 +31,36 @@ use crate::redis::kv_ops::RedisKVClient; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time +const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); +const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { StoreCancelKey { - key: String, - field: String, - value: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, - expire: i64, // TTL for key + key: CancelKeyData, + value: Box, + expire: std::time::Duration, }, GetCancelData { - key: String, - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - key: String, - field: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, + key: CancelKeyData, }, } pub struct Pipeline { inner: redis::Pipeline, - replies: Vec, + replies: usize, } impl Pipeline { fn with_capacity(n: usize) -> Self { Self { inner: redis::Pipeline::with_capacity(n), - replies: Vec::with_capacity(n), + replies: 0, } } - async fn execute(&mut self, client: &mut RedisKVClient) { - let responses = self.replies.len(); + async fn execute(self, client: &mut RedisKVClient) -> Vec> { + let responses = self.replies; let batch_size = self.inner.len(); match client.query(&self.inner).await { @@ -76,176 +70,73 @@ impl Pipeline { batch_size, responses, "successfully completed cancellation jobs", ); - for (value, reply) in std::iter::zip(values, self.replies.drain(..)) { - reply.send_value(value); - } + values.into_iter().map(Ok).collect() } Ok(value) => { error!(batch_size, ?value, "unexpected redis return value"); - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("incorrect response type from redis")); - } + std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis"))) + .take(responses) + .collect() } Err(err) => { - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("could not send cmd to redis: {err}")); - } + std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}"))) + .take(responses) + .collect() } } - - self.inner.clear(); - self.replies.clear(); } - fn add_command_with_reply(&mut self, cmd: Cmd, reply: CancelReplyOp) { + fn add_command_with_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd); - self.replies.push(reply); + self.replies += 1; } fn add_command_no_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd).ignore(); } - - fn add_command(&mut self, cmd: Cmd, reply: Option) { - match reply { - Some(reply) => self.add_command_with_reply(cmd, reply), - None => self.add_command_no_reply(cmd), - } - } } impl CancelKeyOp { - fn register(self, pipe: &mut Pipeline) { + fn register(&self, pipe: &mut Pipeline) { #[allow(clippy::used_underscore_binding)] match self { - CancelKeyOp::StoreCancelKey { - key, - field, - value, - resp_tx, - _guard, - expire, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hset(&key, field, value), reply); - pipe.add_command_no_reply(Cmd::expire(key, expire)); + CancelKeyOp::StoreCancelKey { key, value, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); + pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); } - CancelKeyOp::GetCancelData { - key, - resp_tx, - _guard, - } => { - let reply = CancelReplyOp::GetCancelData { resp_tx, _guard }; - pipe.add_command_with_reply(Cmd::hgetall(key), reply); - } - CancelKeyOp::RemoveCancelKey { - key, - field, - resp_tx, - _guard, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hdel(key, field), reply); + CancelKeyOp::GetCancelData { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hget(key, "data")); } } } } -// Message types for sending through mpsc channel -pub enum CancelReplyOp { - StoreCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, - GetCancelData { - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, +pub struct CancellationProcessor { + pub client: RedisKVClient, + pub batch_size: usize, } -impl CancelReplyOp { - fn send_err(self, e: anyhow::Error) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } +impl QueueProcessing for CancellationProcessor { + type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp); + type Res = anyhow::Result; + + fn batch_size(&self, _queue_size: usize) -> usize { + self.batch_size } - fn send_value(self, v: redis::Value) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } - } -} - -// Running as a separate task to accept messages through the rx channel -pub async fn handle_cancel_messages( - client: &mut RedisKVClient, - mut rx: mpsc::Receiver, - batch_size: usize, -) -> anyhow::Result<()> { - let mut batch = Vec::with_capacity(batch_size); - let mut pipeline = Pipeline::with_capacity(batch_size); - - loop { - if rx.recv_many(&mut batch, batch_size).await == 0 { - warn!("shutting down cancellation queue"); - break Ok(()); - } + async fn apply(&mut self, batch: Vec) -> Vec { + let mut pipeline = Pipeline::with_capacity(batch.len()); let batch_size = batch.len(); debug!(batch_size, "running cancellation jobs"); - for msg in batch.drain(..) { - msg.register(&mut pipeline); + for (_, op) in &batch { + op.register(&mut pipeline); } - pipeline.execute(client).await; + pipeline.execute(&mut self.client).await } } @@ -256,7 +147,7 @@ pub struct CancellationHandler { compute_config: &'static ComputeConfig, // rate limiter of cancellation requests limiter: Arc>>, - tx: Option>, // send messages to the redis KV client task + tx: OnceLock>, // send messages to the redis KV client task } #[derive(Debug, Error)] @@ -296,13 +187,10 @@ impl ReportableError for CancelError { } impl CancellationHandler { - pub fn new( - compute_config: &'static ComputeConfig, - tx: Option>, - ) -> Self { + pub fn new(compute_config: &'static ComputeConfig) -> Self { Self { compute_config, - tx, + tx: OnceLock::new(), limiter: Arc::new(std::sync::Mutex::new( LeakyBucketRateLimiter::::new_with_shards( LeakyBucketRateLimiter::::DEFAULT, @@ -312,7 +200,14 @@ impl CancellationHandler { } } - pub(crate) fn get_key(self: &Arc) -> Session { + pub fn init_tx(&self, queue: BatchQueue) { + self.tx + .set(queue) + .map_err(|_| {}) + .expect("cancellation queue should be registered once"); + } + + pub(crate) fn get_key(self: Arc) -> Session { // we intentionally generate a random "backend pid" and "secret key" here. // we use the corresponding u64 as an identifier for the // actual endpoint+pid+secret for postgres/pgbouncer. @@ -322,14 +217,10 @@ impl CancellationHandler { let key: CancelKeyData = rand::random(); - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); - debug!("registered new query cancellation key {key}"); Session { key, - redis_key, - cancellation_handler: Arc::clone(self), + cancellation_handler: self, } } @@ -337,62 +228,43 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::HGet); + let op = CancelKeyOp::GetCancelData { key }; - let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); - let op = CancelKeyOp::GetCancelData { - key: redis_key, - resp_tx, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HGetAll), - }; - - let Some(tx) = &self.tx else { + let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); }; - tx.try_send(op) + const TIMEOUT: Duration = Duration::from_secs(5); + let result = timeout(TIMEOUT, tx.call((guard, op))) + .await + .map_err(|_| { + tracing::warn!("timed out waiting to receive GetCancelData response"); + CancelError::RateLimit + })? .map_err(|e| { - tracing::warn!("failed to send GetCancelData for {key}: {e}"); - }) - .map_err(|()| CancelError::InternalError)?; + tracing::warn!("failed to receive GetCancelData response: {e}"); + CancelError::InternalError + })?; - let result = resp_rx.await.map_err(|e| { + let cancel_state_str = String::from_owned_redis_value(result).map_err(|e| { tracing::warn!("failed to receive GetCancelData response: {e}"); CancelError::InternalError })?; - let cancel_state_str: Option = match result { - Ok(mut state) => { - if state.len() == 1 { - Some(state.remove(0).1) - } else { - tracing::warn!("unexpected number of entries in cancel state: {state:?}"); - return Err(CancelError::InternalError); - } - } - Err(e) => { - tracing::warn!("failed to receive cancel state from redis: {e}"); - return Err(CancelError::InternalError); - } - }; + let cancel_closure: CancelClosure = + serde_json::from_str(&cancel_state_str).map_err(|e| { + tracing::warn!("failed to deserialize cancel state: {e}"); + CancelError::InternalError + })?; - let cancel_state: Option = match cancel_state_str { - Some(state) => { - let cancel_closure: CancelClosure = serde_json::from_str(&state).map_err(|e| { - tracing::warn!("failed to deserialize cancel state: {e}"); - CancelError::InternalError - })?; - Some(cancel_closure) - } - None => None, - }; - Ok(cancel_state) + Ok(Some(cancel_closure)) } + /// Try to cancel a running query for the corresponding connection. /// If the cancellation key is not found, it will be published to Redis. /// check_allowed - if true, check if the IP is allowed to cancel the query. @@ -467,10 +339,10 @@ impl CancellationHandler { /// This should've been a [`std::future::Future`], but /// it's impossible to name a type of an unboxed future /// (we'd need something like `#![feature(type_alias_impl_trait)]`). -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CancelClosure { socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, // for pg_sni router user_info: ComputeUserInfo, } @@ -478,7 +350,7 @@ pub struct CancelClosure { impl CancelClosure { pub(crate) fn new( socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, user_info: ComputeUserInfo, ) -> Self { @@ -491,7 +363,7 @@ impl CancelClosure { } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query( - self, + &self, compute_config: &ComputeConfig, ) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; @@ -512,7 +384,6 @@ impl CancelClosure { pub(crate) struct Session { /// The user-facing key identifying this session. key: CancelKeyData, - redis_key: String, cancellation_handler: Arc, } @@ -521,60 +392,66 @@ impl Session { &self.key } - // Send the store key op to the cancellation handler and set TTL for the key - pub(crate) fn write_cancel_key( + /// Ensure the cancel key is continously refreshed, + /// but stop when the channel is dropped. + pub(crate) async fn maintain_cancel_key( &self, - cancel_closure: CancelClosure, - ) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { - tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); - }; + session_id: uuid::Uuid, + cancel: tokio::sync::oneshot::Receiver, + cancel_closure: &CancelClosure, + compute_config: &ComputeConfig, + ) { + futures::future::select( + std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)), + cancel, + ) + .await; - let closure_json = serde_json::to_string(&cancel_closure).map_err(|e| { - tracing::warn!("failed to serialize cancel closure: {e}"); - CancelError::InternalError - })?; - - let op = CancelKeyOp::StoreCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - value: closure_json, - resp_tx: None, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HSet), - expire: CANCEL_KEY_TTL, - }; - - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send StoreCancelKey for {key}: {e}"); - }); - Ok(()) + if let Err(err) = cancel_closure + .try_cancel_query(compute_config) + .boxed() + .await + { + tracing::warn!( + ?session_id, + ?err, + "could not cancel the query in the database" + ); + } } - pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { + // Ensure the cancel key is continously refreshed. + async fn maintain_redis_cancel_key(&self, cancel_closure: &CancelClosure) -> ! { + let Some(tx) = self.cancellation_handler.tx.get() else { tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); + // don't exit, as we only want to exit if cancelled externally. + std::future::pending().await }; - let op = CancelKeyOp::RemoveCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - resp_tx: None, - _guard: Metrics::get() + let closure_json = serde_json::to_string(&cancel_closure) + .expect("serialising to json string should not fail") + .into_boxed_str(); + + loop { + let guard = Metrics::get() .proxy .cancel_channel_size - .guard(RedisMsgKind::HDel), - }; + .guard(RedisMsgKind::HSet); + let op = CancelKeyOp::StoreCancelKey { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send RemoveCancelKey for {key}: {e}"); - }); - Ok(()) + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + + if tx.call((guard, op)).await.is_ok() { + tokio::time::sleep(CANCEL_KEY_REFRESH).await; + } + } } } diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index aae1fea07d..5dd264b35e 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use postgres_client::config::{AuthKeys, SslMode}; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; -use postgres_client::{CancelToken, NoTls, RawConnection}; +use postgres_client::{NoTls, RawCancelToken, RawConnection}; use postgres_protocol::message::backend::NoticeResponseBody; use thiserror::Error; use tokio::net::{TcpStream, lookup_host}; @@ -265,7 +265,8 @@ impl ConnectInfo { } } -type RustlsStream = >::Stream; +pub type RustlsStream = >::Stream; +pub type MaybeRustlsStream = MaybeTlsStream; pub(crate) struct PostgresConnection { /// Socket connected to a compute node. @@ -279,7 +280,7 @@ pub(crate) struct PostgresConnection { /// Notices received from compute after authenticating pub(crate) delayed_notice: Vec, - _guage: NumDbConnectionsGuard<'static>, + pub(crate) guage: NumDbConnectionsGuard<'static>, } impl ConnectInfo { @@ -327,8 +328,7 @@ impl ConnectInfo { // Yet another reason to rework the connection establishing code. let cancel_closure = CancelClosure::new( socket_addr, - CancelToken { - socket_config: None, + RawCancelToken { ssl_mode: self.ssl_mode, process_id, secret_key, @@ -343,7 +343,7 @@ impl ConnectInfo { delayed_notice, cancel_closure, aux, - _guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), + guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), }; Ok(connection) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 5331ea41fd..89adfc9049 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -120,7 +120,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { error!( @@ -232,22 +232,35 @@ pub(crate) async fn handle_client( .or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) }) .await?; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); - - session.write_cancel_key(node.cancel_closure.clone())?; + let session = cancellation_handler.get_key(); prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id: None, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 026c6aeba9..d96f582fad 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -75,6 +75,7 @@ pub mod binary; mod auth; +mod batch; mod cache; mod cancellation; mod compute; diff --git a/proxy/src/pglb/passthrough.rs b/proxy/src/pglb/passthrough.rs index 6f651d383d..d4c029f6d9 100644 --- a/proxy/src/pglb/passthrough.rs +++ b/proxy/src/pglb/passthrough.rs @@ -1,15 +1,17 @@ -use futures::FutureExt; +use std::convert::Infallible; + use smol_str::SmolStr; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; -use crate::cancellation; -use crate::compute::PostgresConnection; -use crate::config::ComputeConfig; +use crate::compute::MaybeRustlsStream; use crate::control_plane::messages::MetricsAuxInfo; -use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard}; +use crate::metrics::{ + Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard, + NumDbConnectionsGuard, +}; use crate::stream::Stream; use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}; @@ -64,40 +66,20 @@ pub(crate) async fn proxy_pass( pub(crate) struct ProxyPassthrough { pub(crate) client: Stream, - pub(crate) compute: PostgresConnection, + pub(crate) compute: MaybeRustlsStream, + pub(crate) aux: MetricsAuxInfo, - pub(crate) session_id: uuid::Uuid, pub(crate) private_link_id: Option, - pub(crate) cancel: cancellation::Session, + + pub(crate) _cancel_on_shutdown: tokio::sync::oneshot::Sender, pub(crate) _req: NumConnectionRequestsGuard<'static>, pub(crate) _conn: NumClientConnectionsGuard<'static>, + pub(crate) _db_conn: NumDbConnectionsGuard<'static>, } impl ProxyPassthrough { - pub(crate) async fn proxy_pass( - self, - compute_config: &ComputeConfig, - ) -> Result<(), ErrorSource> { - let res = proxy_pass( - self.client, - self.compute.stream, - self.aux, - self.private_link_id, - ) - .await; - if let Err(err) = self - .compute - .cancel_closure - .try_cancel_query(compute_config) - .boxed() - .await - { - tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); - } - - drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error - - res + pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> { + proxy_pass(self.client, self.compute, self.aux, self.private_link_id).await } } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4211406f6c..7da1b8d8fa 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -155,7 +155,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { warn!( @@ -372,13 +372,24 @@ pub(crate) async fn handle_client( Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, }; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); + let session = cancellation_handler.get_key(); - session.write_cancel_key(node.cancel_closure.clone())?; prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + let private_link_id = match ctx.extra() { Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()), Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()), @@ -387,13 +398,16 @@ pub(crate) async fn handle_client( Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index 3113bad949..b453e6851c 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,8 +1,4 @@ -use std::io::ErrorKind; - -use anyhow::Ok; - -use crate::pqproto::{CancelKeyData, id_to_cancel_key}; +use crate::pqproto::CancelKeyData; pub mod keyspace { pub const CANCEL_PREFIX: &str = "cancel"; @@ -23,39 +19,12 @@ impl KeyPrefix { } } } - - #[allow(dead_code)] - pub(crate) fn as_str(&self) -> &'static str { - match self { - KeyPrefix::Cancel(_) => keyspace::CANCEL_PREFIX, - } - } -} - -#[allow(dead_code)] -pub(crate) fn parse_redis_key(key: &str) -> anyhow::Result { - let (prefix, key_str) = key.split_once(':').ok_or_else(|| { - anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "missing prefix" - )) - })?; - - match prefix { - keyspace::CANCEL_PREFIX => { - let id = u64::from_str_radix(key_str, 16)?; - - Ok(KeyPrefix::Cancel(id_to_cancel_key(id))) - } - _ => Err(anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "unknown prefix" - ))), - } } #[cfg(test)] mod tests { + use crate::pqproto::id_to_cancel_key; + use super::*; #[test] @@ -65,16 +34,4 @@ mod tests { let redis_key = cancel_key.build_redis_key(); assert_eq!(redis_key, "cancel:30390000d431"); } - - #[test] - fn test_parse_redis_key() { - let redis_key = "cancel:30390000d431"; - let key: KeyPrefix = parse_redis_key(redis_key).expect("Failed to parse key"); - - let ref_key = id_to_cancel_key(12345 << 32 | 54321); - - assert_eq!(key.as_str(), KeyPrefix::Cancel(ref_key).as_str()); - let KeyPrefix::Cancel(cancel_key) = key; - assert_eq!(ref_key, cancel_key); - } } diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index f71730c533..f8d3b5cc66 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use futures::FutureExt; use redis::aio::ConnectionLike; use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; @@ -35,14 +38,11 @@ impl RedisKVClient { } pub async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) + self.client + .connect() + .boxed() + .await + .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) } pub(crate) async fn query( @@ -54,15 +54,25 @@ impl RedisKVClient { return Err(anyhow::anyhow!("Rate limit exceeded")); } - match q.query(&mut self.client).await { + let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), - Err(e) => { - tracing::error!("failed to run query: {e}"); + Err(e) => e, + }; + + tracing::error!("failed to run query: {e}"); + match e.retry_method() { + redis::RetryMethod::Reconnect => { + tracing::info!("Redis client is disconnected. Reconnecting..."); + self.try_connect().await?; } + redis::RetryMethod::RetryImmediately => {} + redis::RetryMethod::WaitAndRetry => { + // somewhat arbitrary. + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => Err(e)?, } - tracing::info!("Redis client is disconnected. Reconnecting..."); - self.try_connect().await?; Ok(q.query(&mut self.client).await?) } } diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 8648a94869..0d374e6df2 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -167,7 +167,7 @@ pub(crate) async fn serve_websocket( Ok(Some(p)) => { ctx.set_success(); ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => Ok(()), Err(ErrorSource::Client(err)) => Err(err).context("client"), Err(ErrorSource::Compute(err)) => Err(err).context("compute"),