From 15d079cd41c579555b7a42b7d36429ca63c7d8a2 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 20 Jun 2025 10:31:40 +0200 Subject: [PATCH 1/5] pagebench: improve `getpage-latest-lsn` gRPC support (#12293) This improves `pagebench getpage-latest-lsn` gRPC support by: * Using `page_api::Client`. * Removing `--protocol`, and using the `page-server-connstring` scheme instead. * Adding `--compression` to enable zstd compression. --- Cargo.lock | 3 + pageserver/page_api/Cargo.toml | 2 + pageserver/page_api/src/model.rs | 2 +- pageserver/pagebench/Cargo.toml | 1 + pageserver/pagebench/src/cmd/basebackup.rs | 10 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 115 ++++++++++-------- 6 files changed, 79 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ab26b02fa..8cc51350ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4255,6 +4255,7 @@ dependencies = [ "tokio-util", "tonic 0.13.1", "tracing", + "url", "utils", "workspace_hack", ] @@ -4472,6 +4473,8 @@ dependencies = [ "pageserver_api", "postgres_ffi", "prost 0.13.5", + "strum", + "strum_macros", "thiserror 1.0.69", "tokio", "tonic 0.13.1", diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 8b13b9e1db..c5283c2b09 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -11,6 +11,8 @@ futures.workspace = true pageserver_api.workspace = true postgres_ffi.workspace = true prost.workspace = true +strum.workspace = true +strum_macros.workspace = true thiserror.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index ef7f89473f..6efa742799 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -459,7 +459,7 @@ impl GetPageResponse { /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// (potentially shared by many backends), and a gRPC status response would terminate the stream so /// we send GetPageResponse messages with these codes instead. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)] pub enum GetPageStatusCode { /// Unknown status. For forwards compatibility: used when an older client version receives a new /// status code from a newer server version. diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5e4af88e69..f5dfc0db25 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -25,6 +25,7 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tonic.workspace = true +url.workspace = true pageserver_client.workspace = true pageserver_api.workspace = true diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 8015db528d..e028174c1d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -13,7 +13,6 @@ use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; use pageserver_page_api as page_api; use rand::prelude::*; -use reqwest::Url; use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; @@ -21,6 +20,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt use tokio_util::io::StreamReader; use tonic::async_trait; use tracing::{info, instrument}; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; use utils::shard::ShardIndex; @@ -156,12 +156,16 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - let connurl = Url::parse(&args.page_service_connstring)?; + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => return Err(anyhow!("invalid connstring: {err}")), + }; for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - let client: Box = match connurl.scheme() { + let client: Box = match scheme.as_str() { "postgresql" | "postgres" => Box::new( LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, ), diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 3a68a77279..a297819e9b 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -10,33 +10,31 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use camino::Utf8PathBuf; +use futures::{Stream, StreamExt as _}; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; -use pageserver_page_api::proto; +use pageserver_page_api as page_api; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; -#[derive(clap::ValueEnum, Clone, Debug)] -enum Protocol { - Libpq, - Grpc, -} - /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, + /// Pageserver connection string. Supports postgresql:// and grpc:// protocols. #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] @@ -45,8 +43,9 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, - #[clap(long, value_enum, default_value = "libpq")] - protocol: Protocol, + /// If true, enable compression (only for gRPC). + #[clap(long)] + compression: bool, /// Each client sends requests at the given rate. /// /// If a request takes too long and we should be issuing a new request already, @@ -325,18 +324,32 @@ async fn main_impl( .unwrap(); Box::pin(async move { - let client: Box = match args.protocol { - Protocol::Libpq => Box::new( - LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => panic!("invalid connstring: {err}"), + }; + let client: Box = match scheme.as_str() { + "postgresql" | "postgres" => { + assert!(!args.compression, "libpq does not support compression"); + Box::new( + LibpqClient::new(&args.page_service_connstring, worker_id.timeline) + .await + .unwrap(), + ) + } + + "grpc" => Box::new( + GrpcClient::new( + &args.page_service_connstring, + worker_id.timeline, + args.compression, + ) + .await + .unwrap(), ), - Protocol::Grpc => Box::new( - GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), - ), + scheme => panic!("unsupported scheme {scheme}"), }; run_worker(args, client, ss, cancel, rps_period, ranges, weights).await }) @@ -543,8 +556,8 @@ struct LibpqClient { } impl LibpqClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let inner = pageserver_client::page_service::Client::new(connstring) + async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result { + let inner = pageserver_client::page_service::Client::new(connstring.to_string()) .await? .pagestream(ttid.tenant_id, ttid.timeline_id) .await?; @@ -600,34 +613,36 @@ impl Client for LibpqClient { } } -/// A gRPC client using the raw, no-frills gRPC client. +/// A gRPC Pageserver client. struct GrpcClient { - req_tx: tokio::sync::mpsc::Sender, - resp_rx: tonic::Streaming, + req_tx: tokio::sync::mpsc::Sender, + resp_rx: Pin> + Send>>, } impl GrpcClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?; + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let mut client = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are // buffered by Tonic and the OS too. let (req_tx, req_rx) = tokio::sync::mpsc::channel(1); let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); - let mut req = tonic::Request::new(req_stream); - let metadata = req.metadata_mut(); - metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?); - metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?); - metadata.insert("neon-shard-id", "0000".try_into()?); + let resp_rx = Box::pin(client.get_pages(req_stream).await?); - let resp = client.get_pages(req).await?; - let resp_stream = resp.into_inner(); - - Ok(Self { - req_tx, - resp_rx: resp_stream, - }) + Ok(Self { req_tx, resp_rx }) } } @@ -641,27 +656,27 @@ impl Client for GrpcClient { rel: RelTag, blks: Vec, ) -> anyhow::Result<()> { - let req = proto::GetPageRequest { + let req = page_api::GetPageRequest { request_id: req_id, - request_class: proto::GetPageClass::Normal as i32, - read_lsn: Some(proto::ReadLsn { - request_lsn: req_lsn.0, - not_modified_since_lsn: mod_lsn.0, - }), - rel: Some(rel.into()), - block_number: blks, + request_class: page_api::GetPageClass::Normal, + read_lsn: page_api::ReadLsn { + request_lsn: req_lsn, + not_modified_since_lsn: Some(mod_lsn), + }, + rel, + block_numbers: blks, }; self.req_tx.send(req).await?; Ok(()) } async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { - let resp = self.resp_rx.message().await?.unwrap(); + let resp = self.resp_rx.next().await.unwrap().unwrap(); anyhow::ensure!( - resp.status_code == proto::GetPageStatusCode::Ok as i32, + resp.status_code == page_api::GetPageStatusCode::Ok, "unexpected status code: {}", - resp.status_code + resp.status_code, ); - Ok((resp.request_id, resp.page_image)) + Ok((resp.request_id, resp.page_images)) } } From 8b197de7ffcb2b96cfb4b237e5f220f11e4770d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 20 Jun 2025 12:33:11 +0200 Subject: [PATCH 2/5] Increase upload timeout for test_tenant_s3_restore (#12297) Increase the upload timeout of the test to avoid hitting timeouts (which we sometimes do). Fixes https://github.com/neondatabase/neon/issues/12212 --- test_runner/regress/test_s3_restore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/regress/test_s3_restore.py b/test_runner/regress/test_s3_restore.py index 082808f9ff..2d7be1f9d1 100644 --- a/test_runner/regress/test_s3_restore.py +++ b/test_runner/regress/test_s3_restore.py @@ -74,7 +74,7 @@ def test_tenant_s3_restore( last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) last_flush_lsns.append(last_flush_lsn) ps_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn, timeout=60) log.info(f"{timeline} timeline {timeline_id} {last_flush_lsn=}") parent = timeline From a298d2c29b4b68e07cdfb50faa1a74b6df599d45 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 20 Jun 2025 12:48:01 +0100 Subject: [PATCH 3/5] [proxy] replace the batch cancellation queue, shorten the TTL for cancel keys (#11943) See #11942 Idea: * if connections are short lived, they can get enqueued and then also remove themselves later if they never made it to redis. This reduces the load on the queue. * short lived connections (<10m, most?) will only issue 1 command, we remove the delete command and rely on ttl. * we can enqueue as many commands as we want, as we can always cancel the enqueue, thanks to the ~~intrusive linked lists~~ `BTreeMap`. --- .../proxy/tokio-postgres2/src/cancel_query.rs | 14 +- .../proxy/tokio-postgres2/src/cancel_token.rs | 21 +- libs/proxy/tokio-postgres2/src/client.rs | 11 +- libs/proxy/tokio-postgres2/src/lib.rs | 2 +- proxy/src/batch.rs | 146 +++++++ proxy/src/binary/local_proxy.rs | 2 +- proxy/src/binary/proxy.rs | 29 +- proxy/src/cancellation.rs | 405 ++++++------------ proxy/src/compute/mod.rs | 12 +- proxy/src/console_redirect_proxy.rs | 31 +- proxy/src/lib.rs | 1 + proxy/src/pglb/passthrough.rs | 46 +- proxy/src/proxy/mod.rs | 30 +- proxy/src/redis/keys.rs | 49 +-- proxy/src/redis/kv_ops.rs | 36 +- proxy/src/serverless/websocket.rs | 2 +- 16 files changed, 410 insertions(+), 427 deletions(-) create mode 100644 proxy/src/batch.rs diff --git a/libs/proxy/tokio-postgres2/src/cancel_query.rs b/libs/proxy/tokio-postgres2/src/cancel_query.rs index 4c2a5ef50f..94fbf333ed 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query.rs @@ -1,5 +1,3 @@ -use std::io; - use tokio::net::TcpStream; use crate::client::SocketConfig; @@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect; use crate::{Error, cancel_query_raw, connect_socket}; pub(crate) async fn cancel_query( - config: Option, + config: SocketConfig, ssl_mode: SslMode, tls: T, process_id: i32, @@ -17,16 +15,6 @@ pub(crate) async fn cancel_query( where T: MakeTlsConnect, { - let config = match config { - Some(config) => config, - None => { - return Err(Error::connect(io::Error::new( - io::ErrorKind::InvalidInput, - "unknown host", - ))); - } - }; - let hostname = match &config.host { Host::Tcp(host) => &**host, }; diff --git a/libs/proxy/tokio-postgres2/src/cancel_token.rs b/libs/proxy/tokio-postgres2/src/cancel_token.rs index f6526395ee..c5566b4ad9 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_token.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_token.rs @@ -7,11 +7,16 @@ use crate::config::SslMode; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Error, cancel_query, cancel_query_raw}; -/// The capability to request cancellation of in-progress queries on a -/// connection. -#[derive(Clone, Serialize, Deserialize)] +/// A cancellation token that allows easy cancellation of a query. +#[derive(Clone)] pub struct CancelToken { - pub socket_config: Option, + pub socket_config: SocketConfig, + pub raw: RawCancelToken, +} + +/// A raw cancellation token that allows cancellation of a query, given a fresh connection to postgres. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RawCancelToken { pub ssl_mode: SslMode, pub process_id: i32, pub secret_key: i32, @@ -36,14 +41,16 @@ impl CancelToken { { cancel_query::cancel_query( self.socket_config.clone(), - self.ssl_mode, + self.raw.ssl_mode, tls, - self.process_id, - self.secret_key, + self.raw.process_id, + self.raw.secret_key, ) .await } +} +impl RawCancelToken { /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new /// connection itself. pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index a7edfc076a..41b22e35b6 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use crate::cancel_token::RawCancelToken; use crate::codec::{BackendMessages, FrontendMessage}; use crate::config::{Host, SslMode}; use crate::query::RowStream; @@ -331,10 +332,12 @@ impl Client { /// connection associated with this client. pub fn cancel_token(&self) -> CancelToken { CancelToken { - socket_config: Some(self.socket_config.clone()), - ssl_mode: self.ssl_mode, - process_id: self.process_id, - secret_key: self.secret_key, + socket_config: self.socket_config.clone(), + raw: RawCancelToken { + ssl_mode: self.ssl_mode, + process_id: self.process_id, + secret_key: self.secret_key, + }, } } diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 9556070ed5..791c93b972 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -3,7 +3,7 @@ use postgres_protocol2::message::backend::ReadyForQueryBody; -pub use crate::cancel_token::CancelToken; +pub use crate::cancel_token::{CancelToken, RawCancelToken}; pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; pub use crate::connect_raw::RawConnection; diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs new file mode 100644 index 0000000000..61bdf2b747 --- /dev/null +++ b/proxy/src/batch.rs @@ -0,0 +1,146 @@ +//! Batch processing system based on intrusive linked lists. +//! +//! Enqueuing a batch job requires no allocations, with +//! direct support for cancelling jobs early. +use std::collections::BTreeMap; +use std::pin::pin; +use std::sync::Mutex; + +use futures::future::Either; +use scopeguard::ScopeGuard; +use tokio::sync::oneshot::error::TryRecvError; + +use crate::ext::LockExt; + +pub trait QueueProcessing: Send + 'static { + type Req: Send + 'static; + type Res: Send; + + /// Get the desired batch size. + fn batch_size(&self, queue_size: usize) -> usize; + + /// This applies a full batch of events. + /// Must respond with a full batch of replies. + /// + /// If this apply can error, it's expected that errors be forwarded to each Self::Res. + /// + /// Batching does not need to happen atomically. + fn apply(&mut self, req: Vec) -> impl Future> + Send; +} + +pub struct BatchQueue { + processor: tokio::sync::Mutex

, + inner: Mutex>, +} + +struct BatchJob { + req: P::Req, + res: tokio::sync::oneshot::Sender, +} + +impl BatchQueue

{ + pub fn new(p: P) -> Self { + Self { + processor: tokio::sync::Mutex::new(p), + inner: Mutex::new(BatchQueueInner { + version: 0, + queue: BTreeMap::new(), + }), + } + } + + pub async fn call(&self, req: P::Req) -> P::Res { + let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); + let guard = scopeguard::guard(id, move |id| { + let mut inner = self.inner.lock_propagate_poison(); + if inner.queue.remove(&id).is_some() { + tracing::debug!("batched task cancelled before completion"); + } + }); + + let resp = loop { + // try become the leader, or try wait for success. + let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await + { + // we got the resp. + Either::Left((resp, _)) => break resp.ok(), + // we are the leader. + Either::Right((p, rx_)) => { + rx = rx_; + p + } + }; + + let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor); + + // apply a batch. + let values = processor.apply(reqs).await; + + // send response values. + for (tx, value) in std::iter::zip(resps, values) { + // sender hung up but that's fine. + drop(tx.send(value)); + } + + match rx.try_recv() { + Ok(resp) => break Some(resp), + Err(TryRecvError::Closed) => break None, + // edge case - there was a race condition where + // we became the leader but were not in the batch. + // + // Example: + // thread 1: register job id=1 + // thread 2: register job id=2 + // thread 2: processor.lock().await + // thread 1: processor.lock().await + // thread 2: becomes leader, batch_size=1, jobs=[1]. + Err(TryRecvError::Empty) => {} + } + }; + + // already removed. + ScopeGuard::into_inner(guard); + + resp.expect("no response found. batch processer should not panic") + } +} + +struct BatchQueueInner { + version: u64, + queue: BTreeMap>, +} + +impl BatchQueueInner

{ + fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let id = self.version; + + // Overflow concern: + // This is a u64, and we might enqueue 2^16 tasks per second. + // This gives us 2^48 seconds (9 million years). + // Even if this does overflow, it will not break, but some + // jobs with the higher version might never get prioritised. + self.version += 1; + + self.queue.insert(id, BatchJob { req, res: tx }); + + (id, rx) + } + + fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { + let batch_size = p.batch_size(self.queue.len()); + let mut reqs = Vec::with_capacity(batch_size); + let mut resps = Vec::with_capacity(batch_size); + + while reqs.len() < batch_size { + let Some((_, job)) = self.queue.pop_first() else { + break; + }; + reqs.push(job.req); + resps.push(job.res); + } + + (reqs, resps) + } +} diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index ba10fce7b4..e3be454713 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -201,7 +201,7 @@ pub async fn run() -> anyhow::Result<()> { auth_backend, http_listener, shutdown.clone(), - Arc::new(CancellationHandler::new(&config.connect_to_compute, None)), + Arc::new(CancellationHandler::new(&config.connect_to_compute)), endpoint_rate_limiter, ); diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 6ab6df5610..9215dbf73f 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -23,7 +23,8 @@ use utils::{project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned}; -use crate::cancellation::{CancellationHandler, handle_cancel_messages}; +use crate::batch::BatchQueue; +use crate::cancellation::{CancellationHandler, CancellationProcessor}; use crate::config::{ self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, @@ -392,13 +393,7 @@ pub async fn run() -> anyhow::Result<()> { .as_ref() .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); - // channel size should be higher than redis client limit to avoid blocking - let cancel_ch_size = args.cancellation_ch_size; - let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); - let cancellation_handler = Arc::new(CancellationHandler::new( - &config.connect_to_compute, - Some(tx_cancel), - )); + let cancellation_handler = Arc::new(CancellationHandler::new(&config.connect_to_compute)); let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( RateBucketInfo::to_leaky_bucket(&args.endpoint_rps_limit) @@ -530,21 +525,11 @@ pub async fn run() -> anyhow::Result<()> { match redis_kv_client.try_connect().await { Ok(()) => { info!("Connected to Redis KV client"); - maintenance_tasks.spawn(async move { - handle_cancel_messages( - &mut redis_kv_client, - rx_cancel, - args.cancellation_batch_size, - ) - .await?; + cancellation_handler.init_tx(BatchQueue::new(CancellationProcessor { + client: redis_kv_client, + batch_size: args.cancellation_batch_size, + })); - drop(redis_kv_client); - - // `handle_cancel_messages` was terminated due to the tx_cancel - // being dropped. this is not worthy of an error, and this task can only return `Err`, - // so let's wait forever instead. - std::future::pending().await - }); break; } Err(e) => { diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index cce4c1d3a0..036f36c7f6 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,19 +1,23 @@ +use std::convert::Infallible; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; -use anyhow::{Context, anyhow}; +use anyhow::anyhow; +use futures::FutureExt; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::CancelToken; +use postgres_client::RawCancelToken; use postgres_client::tls::MakeTlsConnect; use redis::{Cmd, FromRedisValue, Value}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, info, warn}; +use tokio::time::timeout; +use tracing::{debug, error, info}; use crate::auth::AuthError; use crate::auth::backend::ComputeUserInfo; +use crate::batch::{BatchQueue, QueueProcessing}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; @@ -27,46 +31,36 @@ use crate::redis::kv_ops::RedisKVClient; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time +const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); +const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { StoreCancelKey { - key: String, - field: String, - value: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, - expire: i64, // TTL for key + key: CancelKeyData, + value: Box, + expire: std::time::Duration, }, GetCancelData { - key: String, - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - key: String, - field: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, + key: CancelKeyData, }, } pub struct Pipeline { inner: redis::Pipeline, - replies: Vec, + replies: usize, } impl Pipeline { fn with_capacity(n: usize) -> Self { Self { inner: redis::Pipeline::with_capacity(n), - replies: Vec::with_capacity(n), + replies: 0, } } - async fn execute(&mut self, client: &mut RedisKVClient) { - let responses = self.replies.len(); + async fn execute(self, client: &mut RedisKVClient) -> Vec> { + let responses = self.replies; let batch_size = self.inner.len(); match client.query(&self.inner).await { @@ -76,176 +70,73 @@ impl Pipeline { batch_size, responses, "successfully completed cancellation jobs", ); - for (value, reply) in std::iter::zip(values, self.replies.drain(..)) { - reply.send_value(value); - } + values.into_iter().map(Ok).collect() } Ok(value) => { error!(batch_size, ?value, "unexpected redis return value"); - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("incorrect response type from redis")); - } + std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis"))) + .take(responses) + .collect() } Err(err) => { - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("could not send cmd to redis: {err}")); - } + std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}"))) + .take(responses) + .collect() } } - - self.inner.clear(); - self.replies.clear(); } - fn add_command_with_reply(&mut self, cmd: Cmd, reply: CancelReplyOp) { + fn add_command_with_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd); - self.replies.push(reply); + self.replies += 1; } fn add_command_no_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd).ignore(); } - - fn add_command(&mut self, cmd: Cmd, reply: Option) { - match reply { - Some(reply) => self.add_command_with_reply(cmd, reply), - None => self.add_command_no_reply(cmd), - } - } } impl CancelKeyOp { - fn register(self, pipe: &mut Pipeline) { + fn register(&self, pipe: &mut Pipeline) { #[allow(clippy::used_underscore_binding)] match self { - CancelKeyOp::StoreCancelKey { - key, - field, - value, - resp_tx, - _guard, - expire, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hset(&key, field, value), reply); - pipe.add_command_no_reply(Cmd::expire(key, expire)); + CancelKeyOp::StoreCancelKey { key, value, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); + pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); } - CancelKeyOp::GetCancelData { - key, - resp_tx, - _guard, - } => { - let reply = CancelReplyOp::GetCancelData { resp_tx, _guard }; - pipe.add_command_with_reply(Cmd::hgetall(key), reply); - } - CancelKeyOp::RemoveCancelKey { - key, - field, - resp_tx, - _guard, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hdel(key, field), reply); + CancelKeyOp::GetCancelData { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hget(key, "data")); } } } } -// Message types for sending through mpsc channel -pub enum CancelReplyOp { - StoreCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, - GetCancelData { - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, +pub struct CancellationProcessor { + pub client: RedisKVClient, + pub batch_size: usize, } -impl CancelReplyOp { - fn send_err(self, e: anyhow::Error) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } +impl QueueProcessing for CancellationProcessor { + type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp); + type Res = anyhow::Result; + + fn batch_size(&self, _queue_size: usize) -> usize { + self.batch_size } - fn send_value(self, v: redis::Value) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } - } -} - -// Running as a separate task to accept messages through the rx channel -pub async fn handle_cancel_messages( - client: &mut RedisKVClient, - mut rx: mpsc::Receiver, - batch_size: usize, -) -> anyhow::Result<()> { - let mut batch = Vec::with_capacity(batch_size); - let mut pipeline = Pipeline::with_capacity(batch_size); - - loop { - if rx.recv_many(&mut batch, batch_size).await == 0 { - warn!("shutting down cancellation queue"); - break Ok(()); - } + async fn apply(&mut self, batch: Vec) -> Vec { + let mut pipeline = Pipeline::with_capacity(batch.len()); let batch_size = batch.len(); debug!(batch_size, "running cancellation jobs"); - for msg in batch.drain(..) { - msg.register(&mut pipeline); + for (_, op) in &batch { + op.register(&mut pipeline); } - pipeline.execute(client).await; + pipeline.execute(&mut self.client).await } } @@ -256,7 +147,7 @@ pub struct CancellationHandler { compute_config: &'static ComputeConfig, // rate limiter of cancellation requests limiter: Arc>>, - tx: Option>, // send messages to the redis KV client task + tx: OnceLock>, // send messages to the redis KV client task } #[derive(Debug, Error)] @@ -296,13 +187,10 @@ impl ReportableError for CancelError { } impl CancellationHandler { - pub fn new( - compute_config: &'static ComputeConfig, - tx: Option>, - ) -> Self { + pub fn new(compute_config: &'static ComputeConfig) -> Self { Self { compute_config, - tx, + tx: OnceLock::new(), limiter: Arc::new(std::sync::Mutex::new( LeakyBucketRateLimiter::::new_with_shards( LeakyBucketRateLimiter::::DEFAULT, @@ -312,7 +200,14 @@ impl CancellationHandler { } } - pub(crate) fn get_key(self: &Arc) -> Session { + pub fn init_tx(&self, queue: BatchQueue) { + self.tx + .set(queue) + .map_err(|_| {}) + .expect("cancellation queue should be registered once"); + } + + pub(crate) fn get_key(self: Arc) -> Session { // we intentionally generate a random "backend pid" and "secret key" here. // we use the corresponding u64 as an identifier for the // actual endpoint+pid+secret for postgres/pgbouncer. @@ -322,14 +217,10 @@ impl CancellationHandler { let key: CancelKeyData = rand::random(); - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); - debug!("registered new query cancellation key {key}"); Session { key, - redis_key, - cancellation_handler: Arc::clone(self), + cancellation_handler: self, } } @@ -337,62 +228,43 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::HGet); + let op = CancelKeyOp::GetCancelData { key }; - let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); - let op = CancelKeyOp::GetCancelData { - key: redis_key, - resp_tx, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HGetAll), - }; - - let Some(tx) = &self.tx else { + let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); }; - tx.try_send(op) + const TIMEOUT: Duration = Duration::from_secs(5); + let result = timeout(TIMEOUT, tx.call((guard, op))) + .await + .map_err(|_| { + tracing::warn!("timed out waiting to receive GetCancelData response"); + CancelError::RateLimit + })? .map_err(|e| { - tracing::warn!("failed to send GetCancelData for {key}: {e}"); - }) - .map_err(|()| CancelError::InternalError)?; + tracing::warn!("failed to receive GetCancelData response: {e}"); + CancelError::InternalError + })?; - let result = resp_rx.await.map_err(|e| { + let cancel_state_str = String::from_owned_redis_value(result).map_err(|e| { tracing::warn!("failed to receive GetCancelData response: {e}"); CancelError::InternalError })?; - let cancel_state_str: Option = match result { - Ok(mut state) => { - if state.len() == 1 { - Some(state.remove(0).1) - } else { - tracing::warn!("unexpected number of entries in cancel state: {state:?}"); - return Err(CancelError::InternalError); - } - } - Err(e) => { - tracing::warn!("failed to receive cancel state from redis: {e}"); - return Err(CancelError::InternalError); - } - }; + let cancel_closure: CancelClosure = + serde_json::from_str(&cancel_state_str).map_err(|e| { + tracing::warn!("failed to deserialize cancel state: {e}"); + CancelError::InternalError + })?; - let cancel_state: Option = match cancel_state_str { - Some(state) => { - let cancel_closure: CancelClosure = serde_json::from_str(&state).map_err(|e| { - tracing::warn!("failed to deserialize cancel state: {e}"); - CancelError::InternalError - })?; - Some(cancel_closure) - } - None => None, - }; - Ok(cancel_state) + Ok(Some(cancel_closure)) } + /// Try to cancel a running query for the corresponding connection. /// If the cancellation key is not found, it will be published to Redis. /// check_allowed - if true, check if the IP is allowed to cancel the query. @@ -467,10 +339,10 @@ impl CancellationHandler { /// This should've been a [`std::future::Future`], but /// it's impossible to name a type of an unboxed future /// (we'd need something like `#![feature(type_alias_impl_trait)]`). -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CancelClosure { socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, // for pg_sni router user_info: ComputeUserInfo, } @@ -478,7 +350,7 @@ pub struct CancelClosure { impl CancelClosure { pub(crate) fn new( socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, user_info: ComputeUserInfo, ) -> Self { @@ -491,7 +363,7 @@ impl CancelClosure { } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query( - self, + &self, compute_config: &ComputeConfig, ) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; @@ -512,7 +384,6 @@ impl CancelClosure { pub(crate) struct Session { /// The user-facing key identifying this session. key: CancelKeyData, - redis_key: String, cancellation_handler: Arc, } @@ -521,60 +392,66 @@ impl Session { &self.key } - // Send the store key op to the cancellation handler and set TTL for the key - pub(crate) fn write_cancel_key( + /// Ensure the cancel key is continously refreshed, + /// but stop when the channel is dropped. + pub(crate) async fn maintain_cancel_key( &self, - cancel_closure: CancelClosure, - ) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { - tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); - }; + session_id: uuid::Uuid, + cancel: tokio::sync::oneshot::Receiver, + cancel_closure: &CancelClosure, + compute_config: &ComputeConfig, + ) { + futures::future::select( + std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)), + cancel, + ) + .await; - let closure_json = serde_json::to_string(&cancel_closure).map_err(|e| { - tracing::warn!("failed to serialize cancel closure: {e}"); - CancelError::InternalError - })?; - - let op = CancelKeyOp::StoreCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - value: closure_json, - resp_tx: None, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HSet), - expire: CANCEL_KEY_TTL, - }; - - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send StoreCancelKey for {key}: {e}"); - }); - Ok(()) + if let Err(err) = cancel_closure + .try_cancel_query(compute_config) + .boxed() + .await + { + tracing::warn!( + ?session_id, + ?err, + "could not cancel the query in the database" + ); + } } - pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { + // Ensure the cancel key is continously refreshed. + async fn maintain_redis_cancel_key(&self, cancel_closure: &CancelClosure) -> ! { + let Some(tx) = self.cancellation_handler.tx.get() else { tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); + // don't exit, as we only want to exit if cancelled externally. + std::future::pending().await }; - let op = CancelKeyOp::RemoveCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - resp_tx: None, - _guard: Metrics::get() + let closure_json = serde_json::to_string(&cancel_closure) + .expect("serialising to json string should not fail") + .into_boxed_str(); + + loop { + let guard = Metrics::get() .proxy .cancel_channel_size - .guard(RedisMsgKind::HDel), - }; + .guard(RedisMsgKind::HSet); + let op = CancelKeyOp::StoreCancelKey { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send RemoveCancelKey for {key}: {e}"); - }); - Ok(()) + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + + if tx.call((guard, op)).await.is_ok() { + tokio::time::sleep(CANCEL_KEY_REFRESH).await; + } + } } } diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index aae1fea07d..5dd264b35e 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use postgres_client::config::{AuthKeys, SslMode}; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; -use postgres_client::{CancelToken, NoTls, RawConnection}; +use postgres_client::{NoTls, RawCancelToken, RawConnection}; use postgres_protocol::message::backend::NoticeResponseBody; use thiserror::Error; use tokio::net::{TcpStream, lookup_host}; @@ -265,7 +265,8 @@ impl ConnectInfo { } } -type RustlsStream = >::Stream; +pub type RustlsStream = >::Stream; +pub type MaybeRustlsStream = MaybeTlsStream; pub(crate) struct PostgresConnection { /// Socket connected to a compute node. @@ -279,7 +280,7 @@ pub(crate) struct PostgresConnection { /// Notices received from compute after authenticating pub(crate) delayed_notice: Vec, - _guage: NumDbConnectionsGuard<'static>, + pub(crate) guage: NumDbConnectionsGuard<'static>, } impl ConnectInfo { @@ -327,8 +328,7 @@ impl ConnectInfo { // Yet another reason to rework the connection establishing code. let cancel_closure = CancelClosure::new( socket_addr, - CancelToken { - socket_config: None, + RawCancelToken { ssl_mode: self.ssl_mode, process_id, secret_key, @@ -343,7 +343,7 @@ impl ConnectInfo { delayed_notice, cancel_closure, aux, - _guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), + guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), }; Ok(connection) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 5331ea41fd..89adfc9049 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -120,7 +120,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { error!( @@ -232,22 +232,35 @@ pub(crate) async fn handle_client( .or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) }) .await?; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); - - session.write_cancel_key(node.cancel_closure.clone())?; + let session = cancellation_handler.get_key(); prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id: None, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 026c6aeba9..d96f582fad 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -75,6 +75,7 @@ pub mod binary; mod auth; +mod batch; mod cache; mod cancellation; mod compute; diff --git a/proxy/src/pglb/passthrough.rs b/proxy/src/pglb/passthrough.rs index 6f651d383d..d4c029f6d9 100644 --- a/proxy/src/pglb/passthrough.rs +++ b/proxy/src/pglb/passthrough.rs @@ -1,15 +1,17 @@ -use futures::FutureExt; +use std::convert::Infallible; + use smol_str::SmolStr; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; -use crate::cancellation; -use crate::compute::PostgresConnection; -use crate::config::ComputeConfig; +use crate::compute::MaybeRustlsStream; use crate::control_plane::messages::MetricsAuxInfo; -use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard}; +use crate::metrics::{ + Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard, + NumDbConnectionsGuard, +}; use crate::stream::Stream; use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}; @@ -64,40 +66,20 @@ pub(crate) async fn proxy_pass( pub(crate) struct ProxyPassthrough { pub(crate) client: Stream, - pub(crate) compute: PostgresConnection, + pub(crate) compute: MaybeRustlsStream, + pub(crate) aux: MetricsAuxInfo, - pub(crate) session_id: uuid::Uuid, pub(crate) private_link_id: Option, - pub(crate) cancel: cancellation::Session, + + pub(crate) _cancel_on_shutdown: tokio::sync::oneshot::Sender, pub(crate) _req: NumConnectionRequestsGuard<'static>, pub(crate) _conn: NumClientConnectionsGuard<'static>, + pub(crate) _db_conn: NumDbConnectionsGuard<'static>, } impl ProxyPassthrough { - pub(crate) async fn proxy_pass( - self, - compute_config: &ComputeConfig, - ) -> Result<(), ErrorSource> { - let res = proxy_pass( - self.client, - self.compute.stream, - self.aux, - self.private_link_id, - ) - .await; - if let Err(err) = self - .compute - .cancel_closure - .try_cancel_query(compute_config) - .boxed() - .await - { - tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); - } - - drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error - - res + pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> { + proxy_pass(self.client, self.compute, self.aux, self.private_link_id).await } } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4211406f6c..7da1b8d8fa 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -155,7 +155,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { warn!( @@ -372,13 +372,24 @@ pub(crate) async fn handle_client( Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, }; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); + let session = cancellation_handler.get_key(); - session.write_cancel_key(node.cancel_closure.clone())?; prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + let private_link_id = match ctx.extra() { Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()), Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()), @@ -387,13 +398,16 @@ pub(crate) async fn handle_client( Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index 3113bad949..b453e6851c 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,8 +1,4 @@ -use std::io::ErrorKind; - -use anyhow::Ok; - -use crate::pqproto::{CancelKeyData, id_to_cancel_key}; +use crate::pqproto::CancelKeyData; pub mod keyspace { pub const CANCEL_PREFIX: &str = "cancel"; @@ -23,39 +19,12 @@ impl KeyPrefix { } } } - - #[allow(dead_code)] - pub(crate) fn as_str(&self) -> &'static str { - match self { - KeyPrefix::Cancel(_) => keyspace::CANCEL_PREFIX, - } - } -} - -#[allow(dead_code)] -pub(crate) fn parse_redis_key(key: &str) -> anyhow::Result { - let (prefix, key_str) = key.split_once(':').ok_or_else(|| { - anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "missing prefix" - )) - })?; - - match prefix { - keyspace::CANCEL_PREFIX => { - let id = u64::from_str_radix(key_str, 16)?; - - Ok(KeyPrefix::Cancel(id_to_cancel_key(id))) - } - _ => Err(anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "unknown prefix" - ))), - } } #[cfg(test)] mod tests { + use crate::pqproto::id_to_cancel_key; + use super::*; #[test] @@ -65,16 +34,4 @@ mod tests { let redis_key = cancel_key.build_redis_key(); assert_eq!(redis_key, "cancel:30390000d431"); } - - #[test] - fn test_parse_redis_key() { - let redis_key = "cancel:30390000d431"; - let key: KeyPrefix = parse_redis_key(redis_key).expect("Failed to parse key"); - - let ref_key = id_to_cancel_key(12345 << 32 | 54321); - - assert_eq!(key.as_str(), KeyPrefix::Cancel(ref_key).as_str()); - let KeyPrefix::Cancel(cancel_key) = key; - assert_eq!(ref_key, cancel_key); - } } diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index f71730c533..f8d3b5cc66 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use futures::FutureExt; use redis::aio::ConnectionLike; use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; @@ -35,14 +38,11 @@ impl RedisKVClient { } pub async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) + self.client + .connect() + .boxed() + .await + .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) } pub(crate) async fn query( @@ -54,15 +54,25 @@ impl RedisKVClient { return Err(anyhow::anyhow!("Rate limit exceeded")); } - match q.query(&mut self.client).await { + let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), - Err(e) => { - tracing::error!("failed to run query: {e}"); + Err(e) => e, + }; + + tracing::error!("failed to run query: {e}"); + match e.retry_method() { + redis::RetryMethod::Reconnect => { + tracing::info!("Redis client is disconnected. Reconnecting..."); + self.try_connect().await?; } + redis::RetryMethod::RetryImmediately => {} + redis::RetryMethod::WaitAndRetry => { + // somewhat arbitrary. + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => Err(e)?, } - tracing::info!("Redis client is disconnected. Reconnecting..."); - self.try_connect().await?; Ok(q.query(&mut self.client).await?) } } diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 8648a94869..0d374e6df2 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -167,7 +167,7 @@ pub(crate) async fn serve_websocket( Ok(Some(p)) => { ctx.set_success(); ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => Ok(()), Err(ErrorSource::Client(err)) => Err(err).context("client"), Err(ErrorSource::Compute(err)) => Err(err).context("compute"), From 6508f4e5c1505b2d5a18b60a036c831da0fea705 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 20 Jun 2025 14:57:30 +0300 Subject: [PATCH 4/5] pageserver: revise gc layer map lock handling (#12290) ## Problem Timeline GC is very aggressive with regards to layer map locking. We've seen timelines with loads of layers in production that hold the write lock for the layer map for 30 minutes at a time. This blocks reads and the write path to some extent. ## Summary of changes Determining the set of layers to GC is done under the read lock. Applying the updates is done under the write lock. Previously, everything was done under write lock. --- pageserver/src/tenant/timeline.rs | 190 +++++++++++++++--------------- 1 file changed, 98 insertions(+), 92 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a0e9d8f06a..c8a41f7875 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6543,7 +6543,7 @@ impl Timeline { debug!("retain_lsns: {:?}", retain_lsns); - let mut layers_to_remove = Vec::new(); + let max_retain_lsn = retain_lsns.iter().max(); // Scan all layers in the timeline (remote or on-disk). // @@ -6553,108 +6553,110 @@ impl Timeline { // 3. it doesn't need to be retained for 'retain_lsns'; // 4. it does not need to be kept for LSNs holding valid leases. // 5. newer on-disk image layers cover the layer's whole key range - // - // TODO holding a write lock is too agressive and avoidable - let mut guard = self - .layers - .write(LayerManagerLockHolder::GarbageCollection) - .await; - let layers = guard.layer_map()?; - 'outer: for l in layers.iter_historic_layers() { - result.layers_total += 1; + let layers_to_remove = { + let mut layers_to_remove = Vec::new(); - // 1. Is it newer than GC horizon cutoff point? - if l.get_lsn_range().end > space_cutoff { - info!( - "keeping {} because it's newer than space_cutoff {}", - l.layer_name(), - space_cutoff, - ); - result.layers_needed_by_cutoff += 1; - continue 'outer; - } + let guard = self + .layers + .read(LayerManagerLockHolder::GarbageCollection) + .await; + let layers = guard.layer_map()?; + 'outer: for l in layers.iter_historic_layers() { + result.layers_total += 1; - // 2. It is newer than PiTR cutoff point? - if l.get_lsn_range().end > time_cutoff { - info!( - "keeping {} because it's newer than time_cutoff {}", - l.layer_name(), - time_cutoff, - ); - result.layers_needed_by_pitr += 1; - continue 'outer; - } - - // 3. Is it needed by a child branch? - // NOTE With that we would keep data that - // might be referenced by child branches forever. - // We can track this in child timeline GC and delete parent layers when - // they are no longer needed. This might be complicated with long inheritance chains. - // - // TODO Vec is not a great choice for `retain_lsns` - for retain_lsn in &retain_lsns { - // start_lsn is inclusive - if &l.get_lsn_range().start <= retain_lsn { - info!( - "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + // 1. Is it newer than GC horizon cutoff point? + if l.get_lsn_range().end > space_cutoff { + debug!( + "keeping {} because it's newer than space_cutoff {}", l.layer_name(), - retain_lsn, - l.is_incremental(), + space_cutoff, ); - result.layers_needed_by_branches += 1; + result.layers_needed_by_cutoff += 1; continue 'outer; } - } - // 4. Is there a valid lease that requires us to keep this layer? - if let Some(lsn) = &max_lsn_with_valid_lease { - // keep if layer start <= any of the lease - if &l.get_lsn_range().start <= lsn { - info!( - "keeping {} because there is a valid lease preventing GC at {}", + // 2. It is newer than PiTR cutoff point? + if l.get_lsn_range().end > time_cutoff { + debug!( + "keeping {} because it's newer than time_cutoff {}", l.layer_name(), - lsn, + time_cutoff, ); - result.layers_needed_by_leases += 1; + result.layers_needed_by_pitr += 1; continue 'outer; } + + // 3. Is it needed by a child branch? + // NOTE With that we would keep data that + // might be referenced by child branches forever. + // We can track this in child timeline GC and delete parent layers when + // they are no longer needed. This might be complicated with long inheritance chains. + if let Some(retain_lsn) = max_retain_lsn { + // start_lsn is inclusive + if &l.get_lsn_range().start <= retain_lsn { + debug!( + "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + l.layer_name(), + retain_lsn, + l.is_incremental(), + ); + result.layers_needed_by_branches += 1; + continue 'outer; + } + } + + // 4. Is there a valid lease that requires us to keep this layer? + if let Some(lsn) = &max_lsn_with_valid_lease { + // keep if layer start <= any of the lease + if &l.get_lsn_range().start <= lsn { + debug!( + "keeping {} because there is a valid lease preventing GC at {}", + l.layer_name(), + lsn, + ); + result.layers_needed_by_leases += 1; + continue 'outer; + } + } + + // 5. Is there a later on-disk layer for this relation? + // + // The end-LSN is exclusive, while disk_consistent_lsn is + // inclusive. For example, if disk_consistent_lsn is 100, it is + // OK for a delta layer to have end LSN 101, but if the end LSN + // is 102, then it might not have been fully flushed to disk + // before crash. + // + // For example, imagine that the following layers exist: + // + // 1000 - image (A) + // 1000-2000 - delta (B) + // 2000 - image (C) + // 2000-3000 - delta (D) + // 3000 - image (E) + // + // If GC horizon is at 2500, we can remove layers A and B, but + // we cannot remove C, even though it's older than 2500, because + // the delta layer 2000-3000 depends on it. + if !layers + .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) + { + debug!("keeping {} because it is the latest layer", l.layer_name()); + result.layers_not_updated += 1; + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. + info!( + "garbage collecting {} is_dropped: xx is_incremental: {}", + l.layer_name(), + l.is_incremental(), + ); + layers_to_remove.push(l); } - // 5. Is there a later on-disk layer for this relation? - // - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - // - // For example, imagine that the following layers exist: - // - // 1000 - image (A) - // 1000-2000 - delta (B) - // 2000 - image (C) - // 2000-3000 - delta (D) - // 3000 - image (E) - // - // If GC horizon is at 2500, we can remove layers A and B, but - // we cannot remove C, even though it's older than 2500, because - // the delta layer 2000-3000 depends on it. - if !layers - .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) - { - info!("keeping {} because it is the latest layer", l.layer_name()); - result.layers_not_updated += 1; - continue 'outer; - } - - // We didn't find any reason to keep this file, so remove it. - info!( - "garbage collecting {} is_dropped: xx is_incremental: {}", - l.layer_name(), - l.is_incremental(), - ); - layers_to_remove.push(l); - } + layers_to_remove + }; if !layers_to_remove.is_empty() { // Persist the new GC cutoff value before we actually remove anything. @@ -6670,15 +6672,19 @@ impl Timeline { } })?; + let mut guard = self + .layers + .write(LayerManagerLockHolder::GarbageCollection) + .await; + let gc_layers = layers_to_remove .iter() - .map(|x| guard.get_from_desc(x)) + .flat_map(|desc| guard.try_get_from_key(&desc.key()).cloned()) .collect::>(); result.layers_removed = gc_layers.len() as u64; self.remote_client.schedule_gc_update(&gc_layers)?; - guard.open_mut()?.finish_gc_timeline(&gc_layers); #[cfg(feature = "testing")] From eaf1ab21c43e3a9a072649931c69a2c885d8f87c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 20 Jun 2025 17:50:03 +0300 Subject: [PATCH 5/5] Store intermediate build files in `build/` rather than `pg_install/build/` (#12295) This way, `pg_install` contains only the final build artifacts, not intermediate files like *.o files. Seems cleaner. --- .github/workflows/_build-and-test-locally.yml | 8 +- .github/workflows/build-macos.yml | 14 +-- .gitignore | 1 + Dockerfile | 1 - Makefile | 89 ++++++++++--------- libs/walproposer/build.rs | 8 +- test_runner/regress/test_pg_regress.py | 19 +++- 7 files changed, 78 insertions(+), 62 deletions(-) diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 3f66f41ef2..f9b96271a4 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -313,10 +313,10 @@ jobs: # Use tar to copy files matching the pattern, preserving the paths in the destionation tar c \ pg_install/v* \ - pg_install/build/*/src/test/regress/*.so \ - pg_install/build/*/src/test/regress/pg_regress \ - pg_install/build/*/src/test/isolation/isolationtester \ - pg_install/build/*/src/test/isolation/pg_isolation_regress \ + build/*/src/test/regress/*.so \ + build/*/src/test/regress/pg_regress \ + build/*/src/test/isolation/isolationtester \ + build/*/src/test/isolation/pg_isolation_regress \ | tar x -C /tmp/neon - name: Upload Neon artifact diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 0f7fa3e813..226369de52 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -144,7 +144,7 @@ jobs: id: cache_walproposer_lib uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: - path: pg_install/build/walproposer-lib + path: build/walproposer-lib key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }} - name: Checkout submodule vendor/postgres-v17 @@ -169,11 +169,11 @@ jobs: run: make walproposer-lib -j$(sysctl -n hw.ncpu) - - name: Upload "pg_install/build/walproposer-lib" artifact + - name: Upload "build/walproposer-lib" artifact uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 with: - name: pg_install--build--walproposer-lib - path: pg_install/build/walproposer-lib + name: build--walproposer-lib + path: build/walproposer-lib # The artifact is supposed to be used by the next job in the same workflow, # so there’s no need to store it for too long. retention-days: 1 @@ -226,11 +226,11 @@ jobs: name: pg_install--v17 path: pg_install/v17 - - name: Download "pg_install/build/walproposer-lib" artifact + - name: Download "build/walproposer-lib" artifact uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0 with: - name: pg_install--build--walproposer-lib - path: pg_install/build/walproposer-lib + name: build--walproposer-lib + path: build/walproposer-lib # `actions/download-artifact` doesn't preserve permissions: # https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss diff --git a/.gitignore b/.gitignore index 45eb4dbf0e..70c7e96303 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /artifact_cache +/build /pg_install /target /tmp_check diff --git a/Dockerfile b/Dockerfile index f72d7d9bbc..69657067de 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,7 +45,6 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh ENV BUILD_TYPE=release RUN set -e \ && mold -run make -j $(nproc) -s neon-pg-ext \ - && rm -rf pg_install/build \ && tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz . # Prepare cargo-chef recipe diff --git a/Makefile b/Makefile index 5130e17e59..dee50a51c1 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,12 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) -# Where to install Postgres, default is ./pg_install, maybe useful for package managers +# Where to install Postgres, default is ./pg_install, maybe useful for package +# managers. POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/ +# All intermediate build artifacts are stored here. +BUILD_DIR := build + ICU_PREFIX_DIR := /usr/local/icu # @@ -104,21 +108,20 @@ cargo-target-dir: # Some rules are duplicated for Postgres v14 and 15. We may want to refactor # to avoid the duplication in the future, but it's tolerable for now. # -$(POSTGRES_INSTALL_DIR)/build/%/config.status: - - mkdir -p $(POSTGRES_INSTALL_DIR) - test -e $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG +$(BUILD_DIR)/%/config.status: + mkdir -p $(BUILD_DIR) + test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG +@echo "Configuring Postgres $* build" @test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \ echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \ echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \ exit 1; } - mkdir -p $(POSTGRES_INSTALL_DIR)/build/$* + mkdir -p $(BUILD_DIR)/$* VERSION=$*; \ EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \ - (cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \ + (cd $(BUILD_DIR)/$$VERSION && \ env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \ CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \ $(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \ @@ -130,73 +133,73 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status: # the "build-all-versions" entry points) where direct mention of PostgreSQL # versions is used. .PHONY: postgres-configure-v17 -postgres-configure-v17: $(POSTGRES_INSTALL_DIR)/build/v17/config.status +postgres-configure-v17: $(BUILD_DIR)/v17/config.status .PHONY: postgres-configure-v16 -postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status +postgres-configure-v16: $(BUILD_DIR)/v16/config.status .PHONY: postgres-configure-v15 -postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status +postgres-configure-v15: $(BUILD_DIR)/v15/config.status .PHONY: postgres-configure-v14 -postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status +postgres-configure-v14: $(BUILD_DIR)/v14/config.status # Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)//include .PHONY: postgres-headers-% postgres-headers-%: postgres-configure-% +@echo "Installing PostgreSQL $* headers" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install + $(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install # Compile and install PostgreSQL .PHONY: postgres-% postgres-%: postgres-configure-% \ postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers` +@echo "Compiling PostgreSQL $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install + $(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install +@echo "Compiling libpq $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install + $(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install +@echo "Compiling pg_prewarm $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install +@echo "Compiling pg_buffercache $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install +@echo "Compiling pg_visibility $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_visibility install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install +@echo "Compiling pageinspect $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install +@echo "Compiling pg_trgm $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_trgm install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install +@echo "Compiling amcheck $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install +@echo "Compiling test_decoding $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install .PHONY: postgres-check-% postgres-check-%: postgres-% - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check + $(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check .PHONY: neon-pg-ext-% neon-pg-ext-%: postgres-% +@echo "Compiling neon $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$* + mkdir -p $(BUILD_DIR)/neon-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ + -C $(BUILD_DIR)/neon-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install +@echo "Compiling neon_walredo $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* + mkdir -p $(BUILD_DIR)/neon-walredo-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ + -C $(BUILD_DIR)/neon-walredo-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install +@echo "Compiling neon_rmgr $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* + mkdir -p $(BUILD_DIR)/neon-rmgr-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \ + -C $(BUILD_DIR)/neon-rmgr-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install +@echo "Compiling neon_test_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* + mkdir -p $(BUILD_DIR)/neon-test-utils-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ + -C $(BUILD_DIR)/neon-test-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install +@echo "Compiling neon_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* + mkdir -p $(BUILD_DIR)/neon-utils-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ + -C $(BUILD_DIR)/neon-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install # Build walproposer as a static library. walproposer source code is located @@ -211,15 +214,15 @@ neon-pg-ext-%: postgres-% .PHONY: walproposer-lib walproposer-lib: neon-pg-ext-v17 +@echo "Compiling walproposer-lib" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + mkdir -p $(BUILD_DIR)/walproposer-lib $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ + -C $(BUILD_DIR)/walproposer-lib \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib - cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib - cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \ + cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(BUILD_DIR)/walproposer-lib + cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(BUILD_DIR)/walproposer-lib + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgport.a \ pg_strong_random.o - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \ checksum_helper.o \ cryptohash_openssl.o \ hmac_openssl.o \ @@ -227,7 +230,7 @@ walproposer-lib: neon-pg-ext-v17 parse_manifest.o \ scram-common.o ifeq ($(UNAME_S),Linux) - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \ pg_crc32c.o endif @@ -272,7 +275,7 @@ fmt: postgres-%-pg-bsd-indent: postgres-% +@echo "Compiling pg_bsd_indent" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/ + $(MAKE) -C $(BUILD_DIR)/$*/src/tools/pg_bsd_indent/ # Create typedef list for the core. Note that generally it should be combined with # buildfarm one to cover platform specific stuff. @@ -291,7 +294,7 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list cat $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/typedefs.list |\ cat - postgres-$*-typedefs.list | sort | uniq > postgres-$*-typedefs-full.list +@echo note: you might want to run it on selected files/dirs instead. - INDENT=$(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/pg_bsd_indent \ + INDENT=$(BUILD_DIR)/$*/src/tools/pg_bsd_indent/pg_bsd_indent \ $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/pgindent --typedefs postgres-$*-typedefs-full.list \ $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/ \ --excludes $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/exclude_file_patterns @@ -302,9 +305,9 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17 $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \ FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \ - INDENT=$(POSTGRES_INSTALL_DIR)/build/v17/src/tools/pg_bsd_indent/pg_bsd_indent \ + INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \ PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-v17 \ + -C $(BUILD_DIR)/neon-v17 \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 530ceb1327..b13c8b32b4 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -13,22 +13,24 @@ fn main() -> anyhow::Result<()> { // Tell cargo to invalidate the built crate whenever the wrapper changes println!("cargo:rerun-if-changed=bindgen_deps.h"); + let root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../.."); + // Finding the location of built libraries and Postgres C headers: // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/pg_install` // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/pg_install/{PG_MAJORVERSION}/include/postgresql/server` let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { postgres_install_dir.into() } else { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pg_install") + root_path.join("pg_install") }; let pg_install_abs = std::fs::canonicalize(pg_install_dir)?; - let walproposer_lib_dir = pg_install_abs.join("build/walproposer-lib"); + let walproposer_lib_dir = root_path.join("build/walproposer-lib"); let walproposer_lib_search_str = walproposer_lib_dir .to_str() .ok_or(anyhow!("Bad non-UTF path"))?; - let pgxn_neon = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pgxn/neon"); + let pgxn_neon = root_path.join("pgxn/neon"); let pgxn_neon = std::fs::canonicalize(pgxn_neon)?; let pgxn_neon = pgxn_neon.to_str().ok_or(anyhow!("Bad non-UTF path"))?; diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index 3695ece66b..728241b465 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -173,7 +173,11 @@ def test_pg_regress( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_regress will need. - build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/regress" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/regress" src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/regress" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "parallel_schedule" @@ -250,7 +254,11 @@ def test_isolation( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_isolation_regress will need. - build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/isolation" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/isolation" src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/isolation" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "isolation_schedule" @@ -314,8 +322,11 @@ def test_sql_regress( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_regress will need. - # This test runs neon specific tests - build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/regress" src_path = base_dir / "test_runner/sql_regress" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "parallel_schedule"