From 8d7ed2a4ee1e2753d8a3ac17c6bc43ccabc2ed2e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 2 Jun 2025 13:46:50 +0200 Subject: [PATCH 1/6] pageserver: add gRPC observability middleware (#12093) ## Problem The page service logic asserts that a tracing span is present with tenant/timeline/shard IDs. An initial gRPC page service implementation thus requires a tracing span. Touches https://github.com/neondatabase/neon/issues/11728. ## Summary of changes Adds an `ObservabilityLayer` middleware that generates a tracing span and decorates it with IDs from the gRPC metadata. This is a minimal implementation to address the tracing span assertion. It will be extended with additional observability in later PRs. --- Cargo.lock | 2 + libs/utils/src/lib.rs | 1 + libs/utils/src/span.rs | 19 +++++ pageserver/Cargo.toml | 2 + pageserver/src/page_service.rs | 137 ++++++++++++++++++++++++++------- 5 files changed, 134 insertions(+), 27 deletions(-) create mode 100644 libs/utils/src/span.rs diff --git a/Cargo.lock b/Cargo.lock index 89351432c1..4f7378e95d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4305,6 +4305,7 @@ dependencies = [ "hashlink", "hex", "hex-literal", + "http 1.1.0", "http-utils", "humantime", "humantime-serde", @@ -4367,6 +4368,7 @@ dependencies = [ "toml_edit", "tonic 0.13.1", "tonic-reflection", + "tower 0.5.2", "tracing", "tracing-utils", "twox-hash", diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 206b8bbd8f..11f787562c 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -73,6 +73,7 @@ pub mod error; /// async timeout helper pub mod timeout; +pub mod span; pub mod sync; pub mod failpoint_support; diff --git a/libs/utils/src/span.rs b/libs/utils/src/span.rs new file mode 100644 index 0000000000..4dbc99044b --- /dev/null +++ b/libs/utils/src/span.rs @@ -0,0 +1,19 @@ +//! Tracing span helpers. + +/// Records the given fields in the current span, as a single call. The fields must already have +/// been declared for the span (typically with empty values). +#[macro_export] +macro_rules! span_record { + ($($tokens:tt)*) => {$crate::span_record_in!(::tracing::Span::current(), $($tokens)*)}; +} + +/// Records the given fields in the given span, as a single call. The fields must already have been +/// declared for the span (typically with empty values). +#[macro_export] +macro_rules! span_record_in { + ($span:expr, $($tokens:tt)*) => { + if let Some(meta) = $span.metadata() { + $span.record_all(&tracing::valueset!(meta.fields(), $($tokens)*)); + } + }; +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index c4d6d58945..9591c729e8 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -34,6 +34,7 @@ fail.workspace = true futures.workspace = true hashlink.workspace = true hex.workspace = true +http.workspace = true http-utils.workspace = true humantime-serde.workspace = true humantime.workspace = true @@ -93,6 +94,7 @@ tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } tonic.workspace = true tonic-reflection.workspace = true +tower.workspace = true tracing.workspace = true tracing-utils.workspace = true url.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e96787e027..f011ed49d0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,12 +7,14 @@ use std::os::fd::AsRawFd; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; -use anyhow::{Context, bail}; +use anyhow::{Context as _, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; +use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; use jsonwebtoken::TokenData; @@ -46,7 +48,6 @@ use tokio_util::sync::CancellationToken; use tonic::service::Interceptor as _; use tracing::*; use utils::auth::{Claims, Scope, SwappableJwtAuth}; -use utils::failpoint_support; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::logging::log_slow; use utils::lsn::Lsn; @@ -54,6 +55,7 @@ use utils::shard::ShardIndex; use utils::simple_rcu::RcuReadGuard; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; +use utils::{failpoint_support, span_record}; use crate::auth::check_permission; use crate::basebackup::{self, BasebackupError}; @@ -195,13 +197,17 @@ pub fn spawn_grpc( // Set up the gRPC server. // // TODO: consider tuning window sizes. - // TODO: wire up tracing. let mut server = tonic::transport::Server::builder() .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL)) .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); - // Main page service. + // Main page service stack. Uses a mix of Tonic interceptors and Tower layers: + // + // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service. + // + // * Layers: allow async code, can run code after the service response. However, only has access + // to the raw HTTP request/response, not the gRPC types. let page_service_handler = PageServerHandler::new( tenant_manager, auth.clone(), @@ -214,16 +220,22 @@ pub fn spawn_grpc( gate.enter().expect("just created"), ); + let observability_layer = ObservabilityLayer; let mut tenant_interceptor = TenantMetadataInterceptor; let mut auth_interceptor = TenantAuthInterceptor::new(auth); - let interceptors = move |mut req: tonic::Request<()>| { - req = tenant_interceptor.call(req)?; - req = auth_interceptor.call(req)?; - Ok(req) - }; - let page_service = - proto::PageServiceServer::with_interceptor(page_service_handler, interceptors); + let page_service = tower::ServiceBuilder::new() + // Create tracing span. + .layer(observability_layer) + // Intercept gRPC requests. + .layer(tonic::service::InterceptorLayer::new(move |mut req| { + // Extract tenant metadata. + req = tenant_interceptor.call(req)?; + // Authenticate tenant JWT token. + req = auth_interceptor.call(req)?; + Ok(req) + })) + .service(proto::PageServiceServer::new(page_service_handler)); let server = server.add_service(page_service); // Reflection service for use with e.g. grpcurl. @@ -3311,6 +3323,7 @@ impl proto::PageService for PageServerHandler { type GetPagesStream = Pin> + Send>>; + #[instrument(skip_all)] async fn check_rel_exists( &self, _: tonic::Request, @@ -3318,6 +3331,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_base_backup( &self, _: tonic::Request, @@ -3325,6 +3339,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_db_size( &self, _: tonic::Request, @@ -3332,6 +3347,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + // NB: don't instrument this, instrument each streamed request. async fn get_pages( &self, _: tonic::Request>, @@ -3339,6 +3355,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_rel_size( &self, _: tonic::Request, @@ -3346,6 +3363,7 @@ impl proto::PageService for PageServerHandler { Err(tonic::Status::unimplemented("not implemented")) } + #[instrument(skip_all)] async fn get_slru_segment( &self, _: tonic::Request, @@ -3354,19 +3372,65 @@ impl proto::PageService for PageServerHandler { } } -impl From for QueryError { - fn from(e: GetActiveTenantError) -> Self { - match e { - GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( - ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), - ), - GetActiveTenantError::Cancelled - | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { - QueryError::Shutdown - } - e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), - e => QueryError::Other(anyhow::anyhow!(e)), - } +/// gRPC middleware layer that handles observability concerns: +/// +/// * Creates and enters a tracing span. +/// +/// TODO: add perf tracing. +/// TODO: add timing and metrics. +/// TODO: add logging. +#[derive(Clone)] +struct ObservabilityLayer; + +impl tower::Layer for ObservabilityLayer { + type Service = ObservabilityLayerService; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { inner } + } +} + +#[derive(Clone)] +struct ObservabilityLayerService { + inner: S, +} + +impl tonic::server::NamedService for ObservabilityLayerService { + const NAME: &'static str = S::NAME; // propagate inner service name +} + +impl tower::Service> for ObservabilityLayerService +where + S: tower::Service>, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, Result>; + + fn call(&mut self, req: http::Request) -> Self::Future { + // Create a basic tracing span. Enter the span for the current thread (to use it for inner + // sync code like interceptors), and instrument the future (to use it for inner async code + // like the page service itself). + // + // The instrument() call below is not sufficient. It only affects the returned future, and + // only takes effect when the caller polls it. Any sync code executed when we call + // self.inner.call() below (such as interceptors) runs outside of the returned future, and + // is not affected by it. We therefore have to enter the span on the current thread too. + let span = info_span!( + "grpc:pageservice", + // Set by TenantMetadataInterceptor. + tenant_id = field::Empty, + timeline_id = field::Empty, + shard_id = field::Empty, + ); + let _guard = span.enter(); + + Box::pin(self.inner.call(req).instrument(span.clone())) + } + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } } @@ -3400,19 +3464,22 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor { .map_err(|_| tonic::Status::invalid_argument("invalid neon-timeline-id"))?; // Decode the shard ID. - let shard_index = req + let shard_id = req .metadata() .get("neon-shard-id") .ok_or_else(|| tonic::Status::invalid_argument("missing neon-shard-id"))? .to_str() .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; - let shard_index = ShardIndex::from_str(shard_index) + let shard_id = ShardIndex::from_str(shard_id) .map_err(|_| tonic::Status::invalid_argument("invalid neon-shard-id"))?; // Stash them in the request. let extensions = req.extensions_mut(); extensions.insert(TenantTimelineId::new(tenant_id, timeline_id)); - extensions.insert(shard_index); + extensions.insert(shard_id); + + // Decorate the tracing span. + span_record!(%tenant_id, %timeline_id, %shard_id); Ok(req) } @@ -3486,6 +3553,22 @@ impl From for QueryError { } } +impl From for QueryError { + fn from(e: GetActiveTenantError) -> Self { + match e { + GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected( + ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())), + ), + GetActiveTenantError::Cancelled + | GetActiveTenantError::WillNotBecomeActive(TenantState::Stopping { .. }) => { + QueryError::Shutdown + } + e @ GetActiveTenantError::NotFound(_) => QueryError::NotFound(format!("{e}").into()), + e => QueryError::Other(anyhow::anyhow!(e)), + } + } +} + impl From for QueryError { fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self { match e { From a21c1174edefdfb59fbdce9ae5696c446a3cfe0a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 2 Jun 2025 16:50:49 +0200 Subject: [PATCH 2/6] pagebench: add gRPC support for `get-page-latest-lsn` (#12077) ## Problem We need gRPC support in Pagebench to benchmark the new gRPC Pageserver implementation. Touches #11728. ## Summary of changes Adds a `Client` trait to make the client transport swappable, and a gRPC client via a `--protocol grpc` parameter. This must also specify the connstring with the gRPC port: ``` pagebench get-page-latest-lsn --protocol grpc --page-service-connstring grpc://localhost:51051 ``` The client is implemented using the raw Tonic-generated gRPC client, to minimize client overhead. --- Cargo.lock | 4 + libs/pageserver_api/src/models.rs | 4 +- libs/pageserver_api/src/reltag.rs | 2 +- pageserver/pagebench/Cargo.toml | 6 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 146 ++++++++++++++++-- 5 files changed, 144 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f7378e95d..9fc233e5ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4236,6 +4236,7 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "clap", "futures", @@ -4244,12 +4245,15 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_page_api", "rand 0.8.5", "reqwest", "serde", "serde_json", "tokio", + "tokio-stream", "tokio-util", + "tonic 0.13.1", "tracing", "utils", "workspace_hack", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index e7d612bb7a..01487c0f57 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2045,7 +2045,7 @@ pub enum PagestreamProtocolVersion { pub type RequestId = u64; -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct PagestreamRequest { pub reqid: RequestId, pub request_lsn: Lsn, @@ -2064,7 +2064,7 @@ pub struct PagestreamNblocksRequest { pub rel: RelTag, } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct PagestreamGetPageRequest { pub hdr: PagestreamRequest, pub rel: RelTag, diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 473a44dbf9..4509cab2e0 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; // FIXME: should move 'forknum' as last field to keep this consistent with Postgres. // Then we could replace the custom Ord and PartialOrd implementations below with // deriving them. This will require changes in walredoproc.c. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: Oid, diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5b5ed09a2b..ceb1278eab 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true clap.workspace = true futures.workspace = true @@ -15,14 +16,17 @@ hdrhistogram.workspace = true humantime.workspace = true humantime-serde.workspace = true rand.workspace = true -reqwest.workspace=true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true +tonic.workspace = true pageserver_client.workspace = true pageserver_api.workspace = true +pageserver_page_api.workspace = true utils = { path = "../../libs/utils/" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 50419ec338..395e9cac41 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use anyhow::Context; +use async_trait::async_trait; use camino::Utf8PathBuf; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; -use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; +use pageserver_api::models::{ + PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest, +}; use pageserver_api::shard::TenantShardId; +use pageserver_page_api::proto; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -22,6 +26,12 @@ use utils::lsn::Lsn; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; +#[derive(clap::ValueEnum, Clone, Debug)] +enum Protocol { + Libpq, + Grpc, +} + /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { @@ -35,6 +45,8 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, + #[clap(long, value_enum, default_value = "libpq")] + protocol: Protocol, /// Each client sends requests at the given rate. /// /// If a request takes too long and we should be issuing a new request already, @@ -303,7 +315,20 @@ async fn main_impl( .unwrap(); Box::pin(async move { - client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await + let client: Box = match args.protocol { + Protocol::Libpq => Box::new( + LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline) + .await + .unwrap(), + ), + + Protocol::Grpc => Box::new( + GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline) + .await + .unwrap(), + ), + }; + run_worker(args, client, ss, cancel, rps_period, ranges, weights).await }) }; @@ -355,23 +380,15 @@ async fn main_impl( anyhow::Ok(()) } -async fn client_libpq( +async fn run_worker( args: &Args, - worker_id: WorkerId, + mut client: Box, shared_state: Arc, cancel: CancellationToken, rps_period: Option, ranges: Vec, weights: rand::distributions::weighted::WeightedIndex, ) { - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - let mut client = client - .pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id) - .await - .unwrap(); - shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0; @@ -415,12 +432,12 @@ async fn client_libpq( blkno: block_no, } }; - client.getpage_send(req).await.unwrap(); + client.send_get_page(req).await.unwrap(); inflight.push_back(start); } let start = inflight.pop_front().unwrap(); - client.getpage_recv().await.unwrap(); + client.recv_get_page().await.unwrap(); let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1; @@ -442,3 +459,104 @@ async fn client_libpq( } } } + +/// A benchmark client, to allow switching out the transport protocol. +/// +/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could +/// return a future that resolves when the response is received, but we don't really need it. +#[async_trait] +trait Client: Send { + /// Sends an asynchronous GetPage request to the pageserver. + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>; + + /// Receives the next GetPage response from the pageserver. + async fn recv_get_page(&mut self) -> anyhow::Result; +} + +/// A libpq-based Pageserver client. +struct LibpqClient { + inner: pageserver_client::page_service::PagestreamClient, +} + +impl LibpqClient { + async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { + let inner = pageserver_client::page_service::Client::new(connstring) + .await? + .pagestream(ttid.tenant_id, ttid.timeline_id) + .await?; + Ok(Self { inner }) + } +} + +#[async_trait] +impl Client for LibpqClient { + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + self.inner.getpage_send(req).await + } + + async fn recv_get_page(&mut self) -> anyhow::Result { + self.inner.getpage_recv().await + } +} + +/// A gRPC client using the raw, no-frills gRPC client. +struct GrpcClient { + req_tx: tokio::sync::mpsc::Sender, + resp_rx: tonic::Streaming, +} + +impl GrpcClient { + async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { + let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?; + + // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the + // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are + // buffered by Tonic and the OS too. + let (req_tx, req_rx) = tokio::sync::mpsc::channel(1); + let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + let mut req = tonic::Request::new(req_stream); + let metadata = req.metadata_mut(); + metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?); + metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?); + metadata.insert("neon-shard-id", "0000".try_into()?); + + let resp = client.get_pages(req).await?; + let resp_stream = resp.into_inner(); + + Ok(Self { + req_tx, + resp_rx: resp_stream, + }) + } +} + +#[async_trait] +impl Client for GrpcClient { + async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> { + let req = proto::GetPageRequest { + request_id: 0, + request_class: proto::GetPageClass::Normal as i32, + read_lsn: Some(proto::ReadLsn { + request_lsn: req.hdr.request_lsn.0, + not_modified_since_lsn: req.hdr.not_modified_since.0, + }), + rel: Some(req.rel.into()), + block_number: vec![req.blkno], + }; + self.req_tx.send(req).await?; + Ok(()) + } + + async fn recv_get_page(&mut self) -> anyhow::Result { + let resp = self.resp_rx.message().await?.unwrap(); + anyhow::ensure!( + resp.status_code == proto::GetPageStatusCode::Ok as i32, + "unexpected status code: {}", + resp.status_code + ); + Ok(PagestreamGetPageResponse { + page: resp.page_image[0].clone(), + req: PagestreamGetPageRequest::default(), // dummy + }) + } +} From 781bf4945d9cb3902de829a187ee0e7ebc71e432 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 2 Jun 2025 17:13:30 +0100 Subject: [PATCH 3/6] proxy: optimise future layout allocations (#12104) A smaller version of #12066 that is somewhat easier to review. Now that I've been using https://crates.io/crates/top-type-sizes I've found a lot more of the low hanging fruit that can be tweaks to reduce the memory usage. Some context for the optimisations: Rust's stack allocation in futures is quite naive. Stack variables, even if moved, often still end up taking space in the future. Rearranging the order in which variables are defined, and properly scoping them can go a long way. `async fn` and `async move {}` have a consequence that they always duplicate the "upvars" (aka captures). All captures are permanently allocated in the future, even if moved. We can be mindful when writing futures to only capture as little as possible. TlsStream is massive. Needs boxing so it doesn't contribute to the above issue. ## Measurements from `top-type-sizes`: ### Before ``` 10328 {async block@proxy::proxy::task_main::{closure#0}::{closure#0}} align=8 6120 {async fn body of proxy::proxy::handle_client>()} align=8 ``` ### After ``` 4040 {async block@proxy::proxy::task_main::{closure#0}::{closure#0}} 4704 {async fn body of proxy::proxy::handle_client>()} align=8 ``` --- proxy/src/auth/backend/classic.rs | 16 ++-- proxy/src/console_redirect_proxy.rs | 2 +- .../control_plane/client/cplane_proxy_v1.rs | 75 +++++++++++-------- proxy/src/http/mod.rs | 27 +++++-- proxy/src/pqproto.rs | 6 +- proxy/src/proxy/handshake.rs | 9 ++- proxy/src/proxy/mod.rs | 2 +- proxy/src/proxy/passthrough.rs | 2 + proxy/src/sasl/stream.rs | 49 ++++++------ proxy/src/stream.rs | 4 +- proxy/src/tls/postgres_rustls.rs | 6 +- 11 files changed, 115 insertions(+), 83 deletions(-) diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index dcc500f2c8..8445368740 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -25,19 +25,15 @@ pub(super) async fn authenticate( } AuthSecret::Scram(secret) => { debug!("auth endpoint chooses SCRAM"); - let scram = auth::Scram(&secret, ctx); - let auth_outcome = tokio::time::timeout(config.scram_protocol_timeout, async { - AuthFlow::new(client, scram) - .authenticate() - .await - .inspect_err(|error| { - warn!(?error, "error processing scram messages"); - }) - }) + let auth_outcome = tokio::time::timeout( + config.scram_protocol_timeout, + AuthFlow::new(client, auth::Scram(&secret, ctx)).authenticate(), + ) .await .inspect_err(|_| warn!("error processing scram messages error = authentication timed out, execution time exceeded {} seconds", config.scram_protocol_timeout.as_secs())) - .map_err(auth::AuthError::user_timeout)??; + .map_err(auth::AuthError::user_timeout)? + .inspect_err(|error| warn!(?error, "error processing scram messages"))?; let client_key = match auth_outcome { sasl::Outcome::Success(key) => key, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 9499aba61b..7fb84b5ee5 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -159,7 +159,7 @@ pub async fn task_main( } #[allow(clippy::too_many_arguments)] -pub(crate) async fn handle_client( +pub(crate) async fn handle_client( config: &'static ProxyConfig, backend: &'static ConsoleRedirectBackend, ctx: &RequestContext, diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index 93f4ea6cf7..da548d6b2c 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -7,7 +7,9 @@ use std::time::Duration; use ::http::HeaderName; use ::http::header::AUTHORIZATION; +use bytes::Bytes; use futures::TryFutureExt; +use hyper::StatusCode; use postgres_client::config::SslMode; use tokio::time::Instant; use tracing::{Instrument, debug, info, info_span, warn}; @@ -72,28 +74,34 @@ impl NeonControlPlaneClient { role: &RoleName, ) -> Result { async { - let request = self - .endpoint - .get_path("get_endpoint_access_control") - .header(X_REQUEST_ID, ctx.session_id().to_string()) - .header(AUTHORIZATION, format!("Bearer {}", &self.jwt)) - .query(&[("session_id", ctx.session_id())]) - .query(&[ - ("application_name", ctx.console_application_name().as_str()), - ("endpointish", endpoint.as_str()), - ("role", role.as_str()), - ]) - .build()?; - - debug!(url = request.url().as_str(), "sending http request"); - let start = Instant::now(); let response = { - let _pause = ctx.latency_timer_pause_at(start, crate::metrics::Waiting::Cplane); - self.endpoint.execute(request).await? - }; - info!(duration = ?start.elapsed(), "received http response"); + let request = self + .endpoint + .get_path("get_endpoint_access_control") + .header(X_REQUEST_ID, ctx.session_id().to_string()) + .header(AUTHORIZATION, format!("Bearer {}", &self.jwt)) + .query(&[("session_id", ctx.session_id())]) + .query(&[ + ("application_name", ctx.console_application_name().as_str()), + ("endpointish", endpoint.as_str()), + ("role", role.as_str()), + ]) + .build()?; - let body = match parse_body::(response).await { + debug!(url = request.url().as_str(), "sending http request"); + let start = Instant::now(); + let _pause = ctx.latency_timer_pause_at(start, crate::metrics::Waiting::Cplane); + let response = self.endpoint.execute(request).await?; + + info!(duration = ?start.elapsed(), "received http response"); + + response + }; + + let body = match parse_body::( + response.status(), + response.bytes().await?, + ) { Ok(body) => body, // Error 404 is special: it's ok not to have a secret. // TODO(anna): retry @@ -184,7 +192,10 @@ impl NeonControlPlaneClient { drop(pause); info!(duration = ?start.elapsed(), "received http response"); - let body = parse_body::(response).await?; + let body = parse_body::( + response.status(), + response.bytes().await.map_err(ControlPlaneError::from)?, + )?; let rules = body .jwks @@ -236,7 +247,7 @@ impl NeonControlPlaneClient { let response = self.endpoint.execute(request).await?; drop(pause); info!(duration = ?start.elapsed(), "received http response"); - let body = parse_body::(response).await?; + let body = parse_body::(response.status(), response.bytes().await?)?; // Unfortunately, ownership won't let us use `Option::ok_or` here. let (host, port) = match parse_host_port(&body.address) { @@ -487,33 +498,33 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { } /// Parse http response body, taking status code into account. -async fn parse_body serde::Deserialize<'a>>( - response: http::Response, +fn parse_body serde::Deserialize<'a>>( + status: StatusCode, + body: Bytes, ) -> Result { - let status = response.status(); if status.is_success() { // We shouldn't log raw body because it may contain secrets. info!("request succeeded, processing the body"); - return Ok(response.json().await?); + return Ok(serde_json::from_slice(&body).map_err(std::io::Error::other)?); } - let s = response.bytes().await?; + // Log plaintext to be able to detect, whether there are some cases not covered by the error struct. - info!("response_error plaintext: {:?}", s); + info!("response_error plaintext: {:?}", body); // Don't throw an error here because it's not as important // as the fact that the request itself has failed. - let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| { + let mut body = serde_json::from_slice(&body).unwrap_or_else(|e| { warn!("failed to parse error body: {e}"); - ControlPlaneErrorMessage { + Box::new(ControlPlaneErrorMessage { error: "reason unclear (malformed error message)".into(), http_status_code: status, status: None, - } + }) }); body.http_status_code = status; warn!("console responded with an error ({status}): {body:?}"); - Err(ControlPlaneError::Message(Box::new(body))) + Err(ControlPlaneError::Message(body)) } fn parse_host_port(input: &str) -> Option<(&str, u16)> { diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index 96f600d836..36607e7861 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -4,9 +4,10 @@ pub mod health_server; -use std::time::Duration; +use std::time::{Duration, Instant}; use bytes::Bytes; +use futures::FutureExt; use http::Method; use http_body_util::BodyExt; use hyper::body::Body; @@ -109,15 +110,31 @@ impl Endpoint { } /// Execute a [request](reqwest::Request). - pub(crate) async fn execute(&self, request: Request) -> Result { - let _timer = Metrics::get() + pub(crate) fn execute( + &self, + request: Request, + ) -> impl Future> { + let metric = Metrics::get() .proxy .console_request_latency - .start_timer(ConsoleRequest { + .with_labels(ConsoleRequest { request: request.url().path(), }); - self.client.execute(request).await + let req = self.client.execute(request).boxed(); + + async move { + let start = Instant::now(); + scopeguard::defer!({ + Metrics::get() + .proxy + .console_request_latency + .get_metric(metric) + .observe_duration_since(start); + }); + + req.await + } } } diff --git a/proxy/src/pqproto.rs b/proxy/src/pqproto.rs index d68d9f9474..43074bf208 100644 --- a/proxy/src/pqproto.rs +++ b/proxy/src/pqproto.rs @@ -186,7 +186,7 @@ where pub async fn read_message<'a, S>( stream: &mut S, buf: &'a mut Vec, - max: usize, + max: u32, ) -> io::Result<(u8, &'a mut [u8])> where S: AsyncRead + Unpin, @@ -206,7 +206,7 @@ where let header = read!(stream => Header); // as described above, the length must be at least 4. - let Some(len) = (header.len.get() as usize).checked_sub(4) else { + let Some(len) = header.len.get().checked_sub(4) else { return Err(io::Error::other(format!( "invalid startup message length {}, must be at least 4.", header.len, @@ -222,7 +222,7 @@ where } // read in our entire message. - buf.resize(len, 0); + buf.resize(len as usize, 0); stream.read_exact(buf).await?; Ok((header.tag, buf)) diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 13ee8c7dd2..6970ab8714 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -1,3 +1,4 @@ +use futures::{FutureExt, TryFutureExt}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, info, warn}; @@ -57,7 +58,7 @@ pub(crate) enum HandshakeData { /// It's easier to work with owned `stream` here as we need to upgrade it to TLS; /// we also take an extra care of propagating only the select handshake errors to client. #[tracing::instrument(skip_all)] -pub(crate) async fn handshake( +pub(crate) async fn handshake( ctx: &RequestContext, stream: S, mut tls: Option<&TlsConfig>, @@ -108,7 +109,9 @@ pub(crate) async fn handshake( } } } - }); + }) + .map_ok(Box::new) + .boxed(); res?; @@ -146,7 +149,7 @@ pub(crate) async fn handshake( tls.cert_resolver.resolve(conn_info.server_name()); let tls = Stream::Tls { - tls: Box::new(tls_stream), + tls: tls_stream, tls_server_end_point, }; (stream, msg) = PqStream::parse_startup(tls).await?; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index ac0aca1176..0ffc54aa88 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -270,7 +270,7 @@ impl ReportableError for ClientRequestError { } #[allow(clippy::too_many_arguments)] -pub(crate) async fn handle_client( +pub(crate) async fn handle_client( config: &'static ProxyConfig, auth_backend: &'static auth::Backend<'static, ()>, ctx: &RequestContext, diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index 8f9bd2de2d..55ab5f4dba 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,3 +1,4 @@ +use futures::FutureExt; use smol_str::SmolStr; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; @@ -89,6 +90,7 @@ impl ProxyPassthrough { .compute .cancel_closure .try_cancel_query(compute_config) + .boxed() .await { tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); diff --git a/proxy/src/sasl/stream.rs b/proxy/src/sasl/stream.rs index cb15132673..52ccca58d5 100644 --- a/proxy/src/sasl/stream.rs +++ b/proxy/src/sasl/stream.rs @@ -30,52 +30,53 @@ where F: FnOnce(&str) -> super::Result, M: Mechanism, { - let sasl = { + let (mut mechanism, mut input) = { // pause the timer while we communicate with the client let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client); // Initial client message contains the chosen auth method's name. let msg = stream.read_password_message().await?; - super::FirstMessage::parse(msg).ok_or(super::Error::BadClientMessage("bad sasl message"))? + + let sasl = super::FirstMessage::parse(msg) + .ok_or(super::Error::BadClientMessage("bad sasl message"))?; + + (mechanism(sasl.method)?, sasl.message) }; - let mut mechanism = mechanism(sasl.method)?; - let mut input = sasl.message; loop { - let step = mechanism - .exchange(input) - .inspect_err(|error| tracing::info!(?error, "error during SASL exchange"))?; - - match step { - Step::Continue(moved_mechanism, reply) => { + match mechanism.exchange(input) { + Ok(Step::Continue(moved_mechanism, reply)) => { mechanism = moved_mechanism; - // pause the timer while we communicate with the client - let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client); - // write reply let sasl_msg = BeAuthenticationSaslMessage::Continue(reply.as_bytes()); stream.write_message(BeMessage::AuthenticationSasl(sasl_msg)); - - // get next input - stream.flush().await?; - let msg = stream.read_password_message().await?; - input = std::str::from_utf8(msg) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad encoding"))?; + drop(reply); } - Step::Success(result, reply) => { - // pause the timer while we communicate with the client - let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client); - + Ok(Step::Success(result, reply)) => { // write reply let sasl_msg = BeAuthenticationSaslMessage::Final(reply.as_bytes()); stream.write_message(BeMessage::AuthenticationSasl(sasl_msg)); stream.write_message(BeMessage::AuthenticationOk); + // exit with success break Ok(Outcome::Success(result)); } // exit with failure - Step::Failure(reason) => break Ok(Outcome::Failure(reason)), + Ok(Step::Failure(reason)) => break Ok(Outcome::Failure(reason)), + Err(error) => { + tracing::info!(?error, "error during SASL exchange"); + return Err(error); + } } + + // pause the timer while we communicate with the client + let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client); + + // get next input + stream.flush().await?; + let msg = stream.read_password_message().await?; + input = std::str::from_utf8(msg) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad encoding"))?; } } diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 7126430a85..c49a431c95 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -72,7 +72,7 @@ impl PqStream { impl PqStream { /// Read a raw postgres packet, which will respect the max length requested. /// This is not cancel safe. - async fn read_raw_expect(&mut self, tag: u8, max: usize) -> io::Result<&mut [u8]> { + async fn read_raw_expect(&mut self, tag: u8, max: u32) -> io::Result<&mut [u8]> { let (actual_tag, msg) = read_message(&mut self.stream, &mut self.read, max).await?; if actual_tag != tag { return Err(io::Error::other(format!( @@ -89,7 +89,7 @@ impl PqStream { // passwords are usually pretty short // and SASL SCRAM messages are no longer than 256 bytes in my testing // (a few hashes and random bytes, encoded into base64). - const MAX_PASSWORD_LENGTH: usize = 512; + const MAX_PASSWORD_LENGTH: u32 = 512; self.read_raw_expect(FE_PASSWORD_MESSAGE, MAX_PASSWORD_LENGTH) .await } diff --git a/proxy/src/tls/postgres_rustls.rs b/proxy/src/tls/postgres_rustls.rs index f09e916a1d..013b307f0b 100644 --- a/proxy/src/tls/postgres_rustls.rs +++ b/proxy/src/tls/postgres_rustls.rs @@ -31,7 +31,9 @@ mod private { type Output = io::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.inner).poll(cx).map_ok(RustlsStream) + Pin::new(&mut self.inner) + .poll(cx) + .map_ok(|s| RustlsStream(Box::new(s))) } } @@ -57,7 +59,7 @@ mod private { } } - pub struct RustlsStream(TlsStream); + pub struct RustlsStream(Box>); impl postgres_client::tls::TlsStream for RustlsStream where From fc3994eb71826de6fbec023b74558aa72a7c888b Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 2 Jun 2025 19:15:18 +0200 Subject: [PATCH 4/6] pageserver: initial gRPC page service implementation (#12094) ## Problem We should expose the page service over gRPC. Requires #12093. Touches #11728. ## Summary of changes This patch adds an initial page service implementation over gRPC. It ties in with the existing `PageServerHandler` request logic, to avoid the implementations drifting apart for the core read path. This is just a bare-bones functional implementation. Several important aspects have been omitted, and will be addressed in follow-up PRs: * Limited observability: minimal tracing, no logging, limited metrics and timing, etc. * Rate limiting will currently block. * No performance optimization. * No cancellation handling. * No tests. I've only done rudimentary testing of this, but Pagebench passes at least. --- libs/pageserver_api/src/models.rs | 2 +- libs/pageserver_api/src/reltag.rs | 10 +- pageserver/page_api/src/model.rs | 17 +- pageserver/src/basebackup.rs | 26 +- pageserver/src/bin/pageserver.rs | 4 +- pageserver/src/page_service.rs | 822 ++++++++++++++++++++++++------ pageserver/src/tenant/timeline.rs | 12 + 7 files changed, 723 insertions(+), 170 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 01487c0f57..28ced4a368 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1934,7 +1934,7 @@ pub enum PagestreamFeMessage { } // Wrapped in libpq CopyData -#[derive(strum_macros::EnumProperty)] +#[derive(Debug, strum_macros::EnumProperty)] pub enum PagestreamBeMessage { Exists(PagestreamExistsResponse), Nblocks(PagestreamNblocksResponse), diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 4509cab2e0..e0dd4fdfe8 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -184,12 +184,12 @@ pub enum SlruKind { MultiXactOffsets, } -impl SlruKind { - pub fn to_str(&self) -> &'static str { +impl fmt::Display for SlruKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Clog => "pg_xact", - Self::MultiXactMembers => "pg_multixact/members", - Self::MultiXactOffsets => "pg_multixact/offsets", + Self::Clog => write!(f, "pg_xact"), + Self::MultiXactMembers => write!(f, "pg_multixact/members"), + Self::MultiXactOffsets => write!(f, "pg_multixact/offsets"), } } } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 7ab97a994e..0268ab920b 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -10,6 +10,8 @@ //! //! - Validate protocol invariants, via try_from() and try_into(). +use std::fmt::Display; + use bytes::Bytes; use postgres_ffi::Oid; use smallvec::SmallVec; @@ -48,7 +50,8 @@ pub struct ReadLsn { pub request_lsn: Lsn, /// If given, the caller guarantees that the page has not been modified since this LSN. Must be /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page - /// without waiting for the request LSN to arrive. Valid for all request types. + /// without waiting for the request LSN to arrive. If not given, the request will read at the + /// request_lsn and wait for it to arrive if necessary. Valid for all request types. /// /// It is undefined behaviour to make a request such that the page was, in fact, modified /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an @@ -58,6 +61,17 @@ pub struct ReadLsn { pub not_modified_since_lsn: Option, } +impl Display for ReadLsn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let req_lsn = self.request_lsn; + if let Some(mod_lsn) = self.not_modified_since_lsn { + write!(f, "{req_lsn}>={mod_lsn}") + } else { + req_lsn.fmt(f) + } + } +} + impl ReadLsn { /// Validates the ReadLsn. pub fn validate(&self) -> Result<(), ProtocolError> { @@ -584,6 +598,7 @@ impl TryFrom for proto::GetSlruSegmentResponse { type Error = ProtocolError; fn try_from(segment: GetSlruSegmentResponse) -> Result { + // TODO: can a segment legitimately be empty? if segment.is_empty() { return Err(ProtocolError::Missing("segment")); } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index e89baa0bce..4dba9d267c 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -65,6 +65,30 @@ impl From for BasebackupError { } } +impl From for postgres_backend::QueryError { + fn from(err: BasebackupError) -> Self { + use postgres_backend::QueryError; + use pq_proto::framed::ConnectionError; + match err { + BasebackupError::Client(err, _) => QueryError::Disconnected(ConnectionError::Io(err)), + BasebackupError::Server(err) => QueryError::Other(err), + BasebackupError::Shutdown => QueryError::Shutdown, + } + } +} + +impl From for tonic::Status { + fn from(err: BasebackupError) -> Self { + use tonic::Code; + let code = match &err { + BasebackupError::Client(_, _) => Code::Cancelled, + BasebackupError::Server(_) => Code::Internal, + BasebackupError::Shutdown => Code::Unavailable, + }; + tonic::Status::new(code, err.to_string()) + } +} + /// Create basebackup with non-rel data in it. /// Only include relational data if 'full_backup' is true. /// @@ -248,7 +272,7 @@ where async fn flush(&mut self) -> Result<(), BasebackupError> { let nblocks = self.buf.len() / BLCKSZ as usize; let (kind, segno) = self.current_segment.take().unwrap(); - let segname = format!("{}/{:>04X}", kind.to_str(), segno); + let segname = format!("{kind}/{segno:>04X}"); let header = new_tar_header(&segname, self.buf.len() as u64)?; self.ar .append(&header, self.buf.as_slice()) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index df3c045145..337aa135dc 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -804,7 +804,7 @@ fn start_pageserver( } else { None }, - basebackup_cache.clone(), + basebackup_cache, ); // Spawn a Pageserver gRPC server task. It will spawn separate tasks for @@ -816,12 +816,10 @@ fn start_pageserver( let mut page_service_grpc = None; if let Some(grpc_listener) = grpc_listener { page_service_grpc = Some(page_service::spawn_grpc( - conf, tenant_manager.clone(), grpc_auth, otel_guard.as_ref().map(|g| g.dispatch.clone()), grpc_listener, - basebackup_cache, )?); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f011ed49d0..b9ba4a3555 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1,6 +1,7 @@ //! The Page Service listens for client connections and serves their GetPage@LSN //! requests. +use std::any::Any; use std::borrow::Cow; use std::num::NonZeroUsize; use std::os::fd::AsRawFd; @@ -11,9 +12,9 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; use std::{io, str}; -use anyhow::{Context as _, bail}; +use anyhow::{Context as _, anyhow, bail}; use async_compression::tokio::write::GzipEncoder; -use bytes::Buf; +use bytes::{Buf, BytesMut}; use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; @@ -33,6 +34,7 @@ use pageserver_api::models::{ }; use pageserver_api::reltag::SlruKind; use pageserver_api::shard::TenantShardId; +use pageserver_page_api as page_api; use pageserver_page_api::proto; use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, @@ -41,8 +43,9 @@ use postgres_ffi::BLCKSZ; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use pq_proto::framed::ConnectionError; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor}; +use smallvec::{SmallVec, smallvec}; use strum_macros::IntoStaticStr; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter}; +use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::service::Interceptor as _; @@ -78,7 +81,8 @@ use crate::tenant::mgr::{ GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager, }; use crate::tenant::storage_layer::IoConcurrency; -use crate::tenant::timeline::{self, WaitLsnError}; +use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle}; +use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter}; use crate::tenant::{GetTimelineError, PageReconstructError, Timeline}; use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation}; @@ -167,15 +171,14 @@ pub fn spawn( /// Spawns a gRPC server for the page service. /// +/// TODO: move this onto GrpcPageServiceHandler::spawn(). /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we /// need to reimplement the TCP+TLS accept loop ourselves. pub fn spawn_grpc( - conf: &'static PageServerConf, tenant_manager: Arc, auth: Option>, perf_trace_dispatch: Option, listener: std::net::TcpListener, - basebackup_cache: Arc, ) -> anyhow::Result { let cancel = CancellationToken::new(); let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) @@ -208,24 +211,17 @@ pub fn spawn_grpc( // // * Layers: allow async code, can run code after the service response. However, only has access // to the raw HTTP request/response, not the gRPC types. - let page_service_handler = PageServerHandler::new( + let page_service_handler = GrpcPageServiceHandler { tenant_manager, - auth.clone(), - PageServicePipeliningConfig::Serial, // TODO: unused with gRPC - conf.get_vectored_concurrent_io, - ConnectionPerfSpanFields::default(), - basebackup_cache, ctx, - cancel.clone(), - gate.enter().expect("just created"), - ); + }; let observability_layer = ObservabilityLayer; let mut tenant_interceptor = TenantMetadataInterceptor; let mut auth_interceptor = TenantAuthInterceptor::new(auth); let page_service = tower::ServiceBuilder::new() - // Create tracing span. + // Create tracing span and record request start time. .layer(observability_layer) // Intercept gRPC requests. .layer(tonic::service::InterceptorLayer::new(move |mut req| { @@ -554,7 +550,7 @@ impl TimelineHandles { tenant_id: TenantId, timeline_id: TimelineId, shard_selector: ShardSelector, - ) -> Result, GetActiveTimelineError> { + ) -> Result, GetActiveTimelineError> { if *self.wrapper.tenant_id.get_or_init(|| tenant_id) != tenant_id { return Err(GetActiveTimelineError::Tenant( GetActiveTenantError::SwitchedTenant, @@ -721,6 +717,82 @@ enum PageStreamError { BadRequest(Cow<'static, str>), } +impl PageStreamError { + /// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status + /// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a + /// convenience method for use from a get_pages gRPC stream. + #[allow(clippy::result_large_err)] + fn into_get_page_response( + self, + request_id: page_api::RequestID, + ) -> Result { + use page_api::GetPageStatusCode; + use tonic::Code; + + // We dispatch to Into first, and then map it to a GetPageResponse. + let status: tonic::Status = self.into(); + let status_code = match status.code() { + // We shouldn't see an OK status here, because we're emitting an error. + Code::Ok => { + debug_assert_ne!(status.code(), Code::Ok); + return Err(tonic::Status::internal(format!( + "unexpected OK status: {status:?}", + ))); + } + + // These are per-request errors, returned as GetPageResponses. + Code::AlreadyExists => GetPageStatusCode::InvalidRequest, + Code::DataLoss => GetPageStatusCode::InternalError, + Code::FailedPrecondition => GetPageStatusCode::InvalidRequest, + Code::InvalidArgument => GetPageStatusCode::InvalidRequest, + Code::Internal => GetPageStatusCode::InternalError, + Code::NotFound => GetPageStatusCode::NotFound, + Code::OutOfRange => GetPageStatusCode::InvalidRequest, + Code::ResourceExhausted => GetPageStatusCode::SlowDown, + + // These should terminate the stream. + Code::Aborted => return Err(status), + Code::Cancelled => return Err(status), + Code::DeadlineExceeded => return Err(status), + Code::PermissionDenied => return Err(status), + Code::Unauthenticated => return Err(status), + Code::Unavailable => return Err(status), + Code::Unimplemented => return Err(status), + Code::Unknown => return Err(status), + }; + + Ok(page_api::GetPageResponse { + request_id, + status_code, + reason: Some(status.message().to_string()), + page_images: SmallVec::new(), + } + .into()) + } +} + +impl From for tonic::Status { + fn from(err: PageStreamError) -> Self { + use tonic::Code; + let message = err.to_string(); + let code = match err { + PageStreamError::Reconnect(_) => Code::Unavailable, + PageStreamError::Shutdown => Code::Unavailable, + PageStreamError::Read(err) => match err { + PageReconstructError::Cancelled => Code::Unavailable, + PageReconstructError::MissingKey(_) => Code::NotFound, + PageReconstructError::AncestorLsnTimeout(err) => tonic::Status::from(err).code(), + PageReconstructError::Other(_) => Code::Internal, + PageReconstructError::WalRedo(_) => Code::Internal, + }, + PageStreamError::LsnTimeout(err) => tonic::Status::from(err).code(), + PageStreamError::NotFound(_) => Code::NotFound, + PageStreamError::BadRequest(_) => Code::InvalidArgument, + }; + tonic::Status::new(code, message) + } +} + impl From for PageStreamError { fn from(value: PageReconstructError) -> Self { match value { @@ -801,37 +873,37 @@ enum BatchedFeMessage { Exists { span: Span, timer: SmgrOpTimer, - shard: timeline::handle::WeakHandle, + shard: WeakHandle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, timer: SmgrOpTimer, - shard: timeline::handle::WeakHandle, + shard: WeakHandle, req: models::PagestreamNblocksRequest, }, GetPage { span: Span, - shard: timeline::handle::WeakHandle, - pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + shard: WeakHandle, + pages: SmallVec<[BatchedGetPageRequest; 1]>, batch_break_reason: GetPageBatchBreakReason, }, DbSize { span: Span, timer: SmgrOpTimer, - shard: timeline::handle::WeakHandle, + shard: WeakHandle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, timer: SmgrOpTimer, - shard: timeline::handle::WeakHandle, + shard: WeakHandle, req: models::PagestreamGetSlruSegmentRequest, }, #[cfg(feature = "testing")] Test { span: Span, - shard: timeline::handle::WeakHandle, + shard: WeakHandle, requests: Vec, }, RespondError { @@ -1080,26 +1152,6 @@ impl PageServerHandler { let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; - // TODO: turn in to async closure once available to avoid repeating received_at - async fn record_op_start_and_throttle( - shard: &timeline::handle::Handle, - op: metrics::SmgrQueryType, - received_at: Instant, - ) -> Result { - // It's important to start the smgr op metric recorder as early as possible - // so that the _started counters are incremented before we do - // any serious waiting, e.g., for throttle, batching, or actual request handling. - let mut timer = shard.query_metrics.start_smgr_op(op, received_at); - let now = Instant::now(); - timer.observe_throttle_start(now); - let throttled = tokio::select! { - res = shard.pagestream_throttle.throttle(1, now) => res, - _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown), - }; - timer.observe_throttle_done(throttled); - Ok(timer) - } - let batched_msg = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { let shard = timeline_handles @@ -1107,7 +1159,7 @@ impl PageServerHandler { .await?; debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug()); - let timer = record_op_start_and_throttle( + let timer = Self::record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetRelExists, received_at, @@ -1125,7 +1177,7 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug()); - let timer = record_op_start_and_throttle( + let timer = Self::record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetRelSize, received_at, @@ -1143,7 +1195,7 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug()); - let timer = record_op_start_and_throttle( + let timer = Self::record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetDbSize, received_at, @@ -1161,7 +1213,7 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug()); - let timer = record_op_start_and_throttle( + let timer = Self::record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetSlruSegment, received_at, @@ -1286,7 +1338,7 @@ impl PageServerHandler { // request handler log messages contain the request-specific fields. let span = mkspan!(shard.tenant_shard_id.shard_slug()); - let timer = record_op_start_and_throttle( + let timer = Self::record_op_start_and_throttle( &shard, metrics::SmgrQueryType::GetPageAtLsn, received_at, @@ -1333,7 +1385,7 @@ impl PageServerHandler { BatchedFeMessage::GetPage { span, shard: shard.downgrade(), - pages: smallvec::smallvec![BatchedGetPageRequest { + pages: smallvec![BatchedGetPageRequest { req, timer, lsn_range: LsnRange { @@ -1355,9 +1407,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug()); - let timer = - record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at) - .await?; + let timer = Self::record_op_start_and_throttle( + &shard, + metrics::SmgrQueryType::Test, + received_at, + ) + .await?; BatchedFeMessage::Test { span, shard: shard.downgrade(), @@ -1368,6 +1423,26 @@ impl PageServerHandler { Ok(Some(batched_msg)) } + /// Starts a SmgrOpTimer at received_at and throttles the request. + async fn record_op_start_and_throttle( + shard: &Handle, + op: metrics::SmgrQueryType, + received_at: Instant, + ) -> Result { + // It's important to start the smgr op metric recorder as early as possible + // so that the _started counters are incremented before we do + // any serious waiting, e.g., for throttle, batching, or actual request handling. + let mut timer = shard.query_metrics.start_smgr_op(op, received_at); + let now = Instant::now(); + timer.observe_throttle_start(now); + let throttled = tokio::select! { + res = shard.pagestream_throttle.throttle(1, now) => res, + _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown), + }; + timer.observe_throttle_done(throttled); + Ok(timer) + } + /// Post-condition: `batch` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] #[allow(clippy::boxed_local)] @@ -1465,8 +1540,11 @@ impl PageServerHandler { let (mut handler_results, span) = { // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and // won't fit on the stack. - let mut boxpinned = - Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx)); + let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message( + batch, + io_concurrency, + ctx, + )); log_slow( log_slow_name, LOG_SLOW_GETPAGE_THRESHOLD, @@ -1622,7 +1700,6 @@ impl PageServerHandler { /// Helper which dispatches a batched message to the appropriate handler. /// Returns a vec of results, along with the extracted trace span. async fn pagestream_dispatch_batched_message( - &mut self, batch: BatchedFeMessage, io_concurrency: IoConcurrency, ctx: &RequestContext, @@ -1652,10 +1729,10 @@ impl PageServerHandler { let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_rel_exists_request(&shard, &req, &ctx) + Self::handle_get_rel_exists_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer, ctx)) + .map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1671,10 +1748,10 @@ impl PageServerHandler { let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_nblocks_request(&shard, &req, &ctx) + Self::handle_get_nblocks_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer, ctx)) + .map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1692,16 +1769,15 @@ impl PageServerHandler { { let npages = pages.len(); trace!(npages, "handling getpage request"); - let res = self - .handle_get_page_at_lsn_request_batched( - &shard, - pages, - io_concurrency, - batch_break_reason, - &ctx, - ) - .instrument(span.clone()) - .await; + let res = Self::handle_get_page_at_lsn_request_batched( + &shard, + pages, + io_concurrency, + batch_break_reason, + &ctx, + ) + .instrument(span.clone()) + .await; assert_eq!(res.len(), npages); res }, @@ -1718,10 +1794,10 @@ impl PageServerHandler { let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_db_size_request(&shard, &req, &ctx) + Self::handle_db_size_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer, ctx)) + .map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1737,10 +1813,10 @@ impl PageServerHandler { let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_slru_segment_request(&shard, &req, &ctx) + Self::handle_get_slru_segment_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer, ctx)) + .map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1758,8 +1834,7 @@ impl PageServerHandler { { let npages = requests.len(); trace!(npages, "handling getpage request"); - let res = self - .handle_test_request_batch(&shard, requests, &ctx) + let res = Self::handle_test_request_batch(&shard, requests, &ctx) .instrument(span.clone()) .await; assert_eq!(res.len(), npages); @@ -2313,11 +2388,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_rel_exists_request( - &mut self, timeline: &Timeline, req: &PagestreamExistsRequest, ctx: &RequestContext, - ) -> Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -2339,19 +2413,15 @@ impl PageServerHandler { ) .await?; - Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { - req: *req, - exists, - })) + Ok(PagestreamExistsResponse { req: *req, exists }) } #[instrument(skip_all, fields(shard_id))] async fn handle_get_nblocks_request( - &mut self, timeline: &Timeline, req: &PagestreamNblocksRequest, ctx: &RequestContext, - ) -> Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -2373,19 +2443,18 @@ impl PageServerHandler { ) .await?; - Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { + Ok(PagestreamNblocksResponse { req: *req, n_blocks, - })) + }) } #[instrument(skip_all, fields(shard_id))] async fn handle_db_size_request( - &mut self, timeline: &Timeline, req: &PagestreamDbSizeRequest, ctx: &RequestContext, - ) -> Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -2409,17 +2478,13 @@ impl PageServerHandler { .await?; let db_size = total_blocks as i64 * BLCKSZ as i64; - Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { - req: *req, - db_size, - })) + Ok(PagestreamDbSizeResponse { req: *req, db_size }) } #[instrument(skip_all)] async fn handle_get_page_at_lsn_request_batched( - &mut self, timeline: &Timeline, - requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + requests: SmallVec<[BatchedGetPageRequest; 1]>, io_concurrency: IoConcurrency, batch_break_reason: GetPageBatchBreakReason, ctx: &RequestContext, @@ -2544,11 +2609,10 @@ impl PageServerHandler { #[instrument(skip_all, fields(shard_id))] async fn handle_get_slru_segment_request( - &mut self, timeline: &Timeline, req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, - ) -> Result { + ) -> Result { let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -2563,16 +2627,13 @@ impl PageServerHandler { .ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?; let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; - Ok(PagestreamBeMessage::GetSlruSegment( - PagestreamGetSlruSegmentResponse { req: *req, segment }, - )) + Ok(PagestreamGetSlruSegmentResponse { req: *req, segment }) } // NB: this impl mimics what we do for batched getpage requests. #[cfg(feature = "testing")] #[instrument(skip_all, fields(shard_id))] async fn handle_test_request_batch( - &mut self, timeline: &Timeline, requests: Vec, _ctx: &RequestContext, @@ -2648,15 +2709,6 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { - fn map_basebackup_error(err: BasebackupError) -> QueryError { - match err { - // TODO: passthrough the error site to the final error message? - BasebackupError::Client(e, _) => QueryError::Disconnected(ConnectionError::Io(e)), - BasebackupError::Server(e) => QueryError::Other(e), - BasebackupError::Shutdown => QueryError::Shutdown, - } - } - let started = std::time::Instant::now(); let timeline = self @@ -2714,8 +2766,7 @@ impl PageServerHandler { replica, &ctx, ) - .await - .map_err(map_basebackup_error)?; + .await?; } else { let mut writer = BufWriter::new(pgb.copyout_writer()); @@ -2738,11 +2789,8 @@ impl PageServerHandler { from_cache = true; tokio::io::copy(&mut cached, &mut writer) .await - .map_err(|e| { - map_basebackup_error(BasebackupError::Client( - e, - "handle_basebackup_request,cached,copy", - )) + .map_err(|err| { + BasebackupError::Client(err, "handle_basebackup_request,cached,copy") })?; } else if gzip { let mut encoder = GzipEncoder::with_quality( @@ -2763,8 +2811,7 @@ impl PageServerHandler { replica, &ctx, ) - .await - .map_err(map_basebackup_error)?; + .await?; // shutdown the encoder to ensure the gzip footer is written encoder .shutdown() @@ -2780,15 +2827,12 @@ impl PageServerHandler { replica, &ctx, ) - .await - .map_err(map_basebackup_error)?; + .await?; } - writer.flush().await.map_err(|e| { - map_basebackup_error(BasebackupError::Client( - e, - "handle_basebackup_request,flush", - )) - })?; + writer + .flush() + .await + .map_err(|err| BasebackupError::Client(err, "handle_basebackup_request,flush"))?; } pgb.write_message_noflush(&BeMessage::CopyDone) @@ -3312,69 +3356,464 @@ where } } -/// Implements the page service over gRPC. +/// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing. /// -/// TODO: not yet implemented, all methods return unimplemented. +/// TODO: rename to PageServiceHandler when libpq impl is removed. +pub struct GrpcPageServiceHandler { + tenant_manager: Arc, + ctx: RequestContext, +} + +impl GrpcPageServiceHandler { + /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of + /// relations and their sizes, as well as SLRU segments and similar data. + #[allow(clippy::result_large_err)] + fn ensure_shard_zero(timeline: &Handle) -> Result<(), tonic::Status> { + match timeline.get_shard_index().shard_number.0 { + 0 => Ok(()), + shard => Err(tonic::Status::invalid_argument(format!( + "request must execute on shard zero (is shard {shard})", + ))), + } + } + + /// Generates a PagestreamRequest header from a ReadLsn and request ID. + fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest { + PagestreamRequest { + reqid: req_id, + request_lsn: read_lsn.request_lsn, + not_modified_since: read_lsn + .not_modified_since_lsn + .unwrap_or(read_lsn.request_lsn), + } + } + + /// Acquires a timeline handle for the given request. + /// + /// TODO: during shard splits, the compute may still be sending requests to the parent shard + /// until the entire split is committed and the compute is notified. Consider installing a + /// temporary shard router from the parent to the children while the split is in progress. + /// + /// TODO: consider moving this to a middleware layer; all requests need it. Needs to manage + /// the TimelineHandles lifecycle. + /// + /// TODO: untangle acquisition from TenantManagerWrapper::resolve() and Cache::get(), to avoid + /// the unnecessary overhead. + async fn get_request_timeline( + &self, + req: &tonic::Request, + ) -> Result, GetActiveTimelineError> { + let ttid = *extract::(req); + let shard_index = *extract::(req); + let shard_selector = ShardSelector::Known(shard_index); + + TimelineHandles::new(self.tenant_manager.clone()) + .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .await + } + + /// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start. + /// Only errors if the timeline is shutting down. + /// + /// TODO: move timer construction to ObservabilityLayer (see TODO there). + /// TODO: decouple rate limiting (middleware?), and return SlowDown errors instead. + async fn record_op_start_and_throttle( + timeline: &Handle, + op: metrics::SmgrQueryType, + received_at: Instant, + ) -> Result { + let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at) + .await + .map_err(|err| match err { + // record_op_start_and_throttle() only returns Shutdown. + QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")), + err => tonic::Status::internal(format!("unexpected error: {err}")), + })?; + timer.observe_execution_start(Instant::now()); + Ok(timer) + } + + /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC. + /// + /// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse + /// with an appropriate status code instead. + /// + /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send + /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or + /// split them up in the client or server. + #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] + async fn get_page( + ctx: &RequestContext, + timeline: &WeakHandle, + req: proto::GetPageRequest, + io_concurrency: IoConcurrency, + ) -> Result { + let received_at = Instant::now(); + let timeline = timeline.upgrade()?; + let ctx = ctx.with_scope_page_service_pagestream(&timeline); + + // Validate the request, decorate the span, and convert it to a Pagestream request. + let req: page_api::GetPageRequest = req.try_into()?; + + span_record!( + req_id = %req.request_id, + rel = %req.rel, + blkno = %req.block_numbers[0], + blks = %req.block_numbers.len(), + lsn = %req.read_lsn, + ); + + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard + let effective_lsn = match PageServerHandler::effective_request_lsn( + &timeline, + timeline.get_last_record_lsn(), + req.read_lsn.request_lsn, + req.read_lsn + .not_modified_since_lsn + .unwrap_or(req.read_lsn.request_lsn), + &latest_gc_cutoff_lsn, + ) { + Ok(lsn) => lsn, + Err(err) => return err.into_get_page_response(req.request_id), + }; + + let mut batch = SmallVec::with_capacity(req.block_numbers.len()); + for blkno in req.block_numbers { + // TODO: this creates one timer per page and throttles it. We should have a timer for + // the entire batch, and throttle only the batch, but this is equivalent to what + // PageServerHandler does already so we keep it for now. + let timer = Self::record_op_start_and_throttle( + &timeline, + metrics::SmgrQueryType::GetPageAtLsn, + received_at, + ) + .await?; + + batch.push(BatchedGetPageRequest { + req: PagestreamGetPageRequest { + hdr: Self::make_hdr(req.read_lsn, req.request_id), + rel: req.rel, + blkno, + }, + lsn_range: LsnRange { + effective_lsn, + request_lsn: req.read_lsn.request_lsn, + }, + timer, + ctx: ctx.attached_child(), + batch_wait_ctx: None, // TODO: add tracing + }); + } + + // TODO: this does a relation size query for every page in the batch. Since this batch is + // all for one relation, we could do this only once. However, this is not the case for the + // libpq implementation. + let results = PageServerHandler::handle_get_page_at_lsn_request_batched( + &timeline, + batch, + io_concurrency, + GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches + &ctx, + ) + .await; + + let mut resp = page_api::GetPageResponse { + request_id: req.request_id, + status_code: page_api::GetPageStatusCode::Ok, + reason: None, + page_images: SmallVec::with_capacity(results.len()), + }; + + for result in results { + match result { + Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page), + Ok((resp, _, _)) => { + return Err(tonic::Status::internal(format!( + "unexpected response: {resp:?}" + ))); + } + Err(err) => return err.err.into_get_page_response(req.request_id), + }; + } + + Ok(resp.into()) + } +} + +/// Implements the gRPC page service. +/// +/// TODO: cancellation. +/// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code. #[tonic::async_trait] -impl proto::PageService for PageServerHandler { +impl proto::PageService for GrpcPageServiceHandler { type GetBaseBackupStream = Pin< Box> + Send>, >; + type GetPagesStream = Pin> + Send>>; - #[instrument(skip_all)] + #[instrument(skip_all, fields(rel, lsn))] async fn check_rel_exists( &self, - _: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + let received_at = extract::(&req).0; + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); + + // Validate the request, decorate the span, and convert it to a Pagestream request. + Self::ensure_shard_zero(&timeline)?; + let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?; + + span_record!(rel=%req.rel, lsn=%req.read_lsn); + + let req = PagestreamExistsRequest { + hdr: Self::make_hdr(req.read_lsn, 0), + rel: req.rel, + }; + + // Execute the request and convert the response. + let _timer = Self::record_op_start_and_throttle( + &timeline, + metrics::SmgrQueryType::GetRelExists, + received_at, + ) + .await?; + + let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?; + let resp: page_api::CheckRelExistsResponse = resp.exists; + Ok(tonic::Response::new(resp.into())) } - #[instrument(skip_all)] + // TODO: ensure clients use gzip compression for the stream. + #[instrument(skip_all, fields(lsn))] async fn get_base_backup( &self, - _: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + // Send 64 KB chunks to avoid large memory allocations. + const CHUNK_SIZE: usize = 64 * 1024; + + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_timeline(&timeline); + + // Validate the request, decorate the span, and wait for the LSN to arrive. + // + // TODO: this requires a read LSN, is that ok? + Self::ensure_shard_zero(&timeline)?; + if timeline.is_archived() == Some(true) { + return Err(tonic::Status::failed_precondition("timeline is archived")); + } + let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?; + + span_record!(lsn=%req.read_lsn); + + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + timeline + .wait_lsn( + req.read_lsn.request_lsn, + WaitLsnWaiter::PageService, + WaitLsnTimeout::Default, + &ctx, + ) + .await?; + timeline + .check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn) + .map_err(|err| { + tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}")) + })?; + + // Spawn a task to run the basebackup. + // + // TODO: do we need to support full base backups, for debugging? + let span = Span::current(); + let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); + let jh = tokio::spawn(async move { + let result = basebackup::send_basebackup_tarball( + &mut simplex_write, + &timeline, + Some(req.read_lsn.request_lsn), + None, + false, + req.replica, + &ctx, + ) + .instrument(span) // propagate request span + .await; + simplex_write.shutdown().await.map_err(|err| { + BasebackupError::Server(anyhow!("simplex shutdown failed: {err}")) + })?; + result + }); + + // Emit chunks of size CHUNK_SIZE. + let chunks = async_stream::try_stream! { + let mut chunk = BytesMut::with_capacity(CHUNK_SIZE); + loop { + let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| { + tonic::Status::internal(format!("failed to read basebackup chunk: {err}")) + })?; + + // If we read 0 bytes, either the chunk is full or the stream is closed. + if n == 0 { + if chunk.is_empty() { + break; + } + yield proto::GetBaseBackupResponseChunk::try_from(chunk.clone().freeze())?; + chunk.clear(); + } + } + // Wait for the basebackup task to exit and check for errors. + jh.await.map_err(|err| { + tonic::Status::internal(format!("basebackup failed: {err}")) + })??; + }; + + Ok(tonic::Response::new(Box::pin(chunks))) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(db_oid, lsn))] async fn get_db_size( &self, - _: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + let received_at = extract::(&req).0; + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); + + // Validate the request, decorate the span, and convert it to a Pagestream request. + Self::ensure_shard_zero(&timeline)?; + let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?; + + span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); + + let req = PagestreamDbSizeRequest { + hdr: Self::make_hdr(req.read_lsn, 0), + dbnode: req.db_oid, + }; + + // Execute the request and convert the response. + let _timer = Self::record_op_start_and_throttle( + &timeline, + metrics::SmgrQueryType::GetDbSize, + received_at, + ) + .await?; + + let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?; + let resp = resp.db_size as page_api::GetDbSizeResponse; + Ok(tonic::Response::new(resp.into())) } // NB: don't instrument this, instrument each streamed request. async fn get_pages( &self, - _: tonic::Request>, + req: tonic::Request>, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + // Extract the timeline from the request and check that it exists. + let ttid = *extract::(&req); + let shard_index = *extract::(&req); + let shard_selector = ShardSelector::Known(shard_index); + + let mut handles = TimelineHandles::new(self.tenant_manager.clone()); + handles + .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .await?; + + let span = Span::current(); + let ctx = self.ctx.attached_child(); + let mut reqs = req.into_inner(); + + let resps = async_stream::try_stream! { + let timeline = handles + .get(ttid.tenant_id, ttid.timeline_id, shard_selector) + .await? + .downgrade(); + while let Some(req) = reqs.message().await? { + // TODO: implement IoConcurrency sidecar. + yield Self::get_page(&ctx, &timeline, req, IoConcurrency::Sequential) + .instrument(span.clone()) // propagate request span + .await? + } + }; + + Ok(tonic::Response::new(Box::pin(resps))) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(rel, lsn))] async fn get_rel_size( &self, - _: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + let received_at = extract::(&req).0; + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); + + // Validate the request, decorate the span, and convert it to a Pagestream request. + Self::ensure_shard_zero(&timeline)?; + let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?; + + span_record!(rel=%req.rel, lsn=%req.read_lsn); + + let req = PagestreamNblocksRequest { + hdr: Self::make_hdr(req.read_lsn, 0), + rel: req.rel, + }; + + // Execute the request and convert the response. + let _timer = Self::record_op_start_and_throttle( + &timeline, + metrics::SmgrQueryType::GetRelSize, + received_at, + ) + .await?; + + let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?; + let resp: page_api::GetRelSizeResponse = resp.n_blocks; + Ok(tonic::Response::new(resp.into())) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(kind, segno, lsn))] async fn get_slru_segment( &self, - _: tonic::Request, + req: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("not implemented")) + let received_at = extract::(&req).0; + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_page_service_pagestream(&timeline); + + // Validate the request, decorate the span, and convert it to a Pagestream request. + Self::ensure_shard_zero(&timeline)?; + let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?; + + span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); + + let req = PagestreamGetSlruSegmentRequest { + hdr: Self::make_hdr(req.read_lsn, 0), + kind: req.kind as u8, + segno: req.segno, + }; + + // Execute the request and convert the response. + let _timer = Self::record_op_start_and_throttle( + &timeline, + metrics::SmgrQueryType::GetSlruSegment, + received_at, + ) + .await?; + + let resp = + PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?; + let resp: page_api::GetSlruSegmentResponse = resp.segment; + Ok(tonic::Response::new(resp.try_into()?)) } } /// gRPC middleware layer that handles observability concerns: /// /// * Creates and enters a tracing span. +/// * Records the request start time as a ReceivedAt request extension. /// /// TODO: add perf tracing. /// TODO: add timing and metrics. @@ -3395,6 +3834,9 @@ struct ObservabilityLayerService { inner: S, } +#[derive(Clone, Copy)] +struct ReceivedAt(Instant); + impl tonic::server::NamedService for ObservabilityLayerService { const NAME: &'static str = S::NAME; // propagate inner service name } @@ -3408,7 +3850,13 @@ where type Error = S::Error; type Future = BoxFuture<'static, Result>; - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { + // Record the request start time as a request extension. + // + // TODO: we should start a timer here instead, but it currently requires a timeline handle + // and SmgrQueryType, which we don't have yet. Refactor it to provide it later. + req.extensions_mut().insert(ReceivedAt(Instant::now())); + // Create a basic tracing span. Enter the span for the current thread (to use it for inner // sync code like interceptors), and instrument the future (to use it for inner async code // like the page service itself). @@ -3436,8 +3884,6 @@ where /// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type /// TenantTimelineId and ShardIndex. -/// -/// TODO: consider looking up the timeline handle here and storing it. #[derive(Clone)] struct TenantMetadataInterceptor; @@ -3485,7 +3931,7 @@ impl tonic::service::Interceptor for TenantMetadataInterceptor { } } -/// Authenticates gRPC page service requests. Must run after TenantMetadataInterceptor. +/// Authenticates gRPC page service requests. #[derive(Clone)] struct TenantAuthInterceptor { auth: Option>, @@ -3504,11 +3950,8 @@ impl tonic::service::Interceptor for TenantAuthInterceptor { return Ok(req); }; - // Fetch the tenant ID that's been set by TenantMetadataInterceptor. - let ttid = req - .extensions() - .get::() - .expect("TenantMetadataInterceptor must run before TenantAuthInterceptor"); + // Fetch the tenant ID from the request extensions (set by TenantMetadataInterceptor). + let TenantTimelineId { tenant_id, .. } = *extract::(&req); // Fetch and decode the JWT token. let jwt = req @@ -3526,7 +3969,7 @@ impl tonic::service::Interceptor for TenantAuthInterceptor { let claims = jwtdata.claims; // Check if the token is valid for this tenant. - check_permission(&claims, Some(ttid.tenant_id)) + check_permission(&claims, Some(tenant_id)) .map_err(|err| tonic::Status::permission_denied(err.to_string()))?; // TODO: consider stashing the claims in the request extensions, if needed. @@ -3535,6 +3978,21 @@ impl tonic::service::Interceptor for TenantAuthInterceptor { } } +/// Extracts the given type from the request extensions, or panics if it is missing. +fn extract(req: &tonic::Request) -> &T { + extract_from(req.extensions()) +} + +/// Extract the given type from the request extensions, or panics if it is missing. This variant +/// can extract both from a tonic::Request and http::Request. +fn extract_from(ext: &http::Extensions) -> &T { + let Some(value) = ext.get::() else { + let name = std::any::type_name::(); + panic!("extension {name} should be set by middleware"); + }; + value +} + #[derive(Debug, thiserror::Error)] pub(crate) enum GetActiveTimelineError { #[error(transparent)] @@ -3553,6 +4011,29 @@ impl From for QueryError { } } +impl From for tonic::Status { + fn from(err: GetActiveTimelineError) -> Self { + let message = err.to_string(); + let code = match err { + GetActiveTimelineError::Tenant(err) => tonic::Status::from(err).code(), + GetActiveTimelineError::Timeline(err) => tonic::Status::from(err).code(), + }; + tonic::Status::new(code, message) + } +} + +impl From for tonic::Status { + fn from(err: GetTimelineError) -> Self { + use tonic::Code; + let code = match &err { + GetTimelineError::NotFound { .. } => Code::NotFound, + GetTimelineError::NotActive { .. } => Code::Unavailable, + GetTimelineError::ShuttingDown => Code::Unavailable, + }; + tonic::Status::new(code, err.to_string()) + } +} + impl From for QueryError { fn from(e: GetActiveTenantError) -> Self { match e { @@ -3569,10 +4050,33 @@ impl From for QueryError { } } -impl From for QueryError { - fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self { +impl From for tonic::Status { + fn from(err: GetActiveTenantError) -> Self { + use tonic::Code; + let code = match &err { + GetActiveTenantError::Broken(_) => Code::Internal, + GetActiveTenantError::Cancelled => Code::Unavailable, + GetActiveTenantError::NotFound(_) => Code::NotFound, + GetActiveTenantError::SwitchedTenant => Code::Unavailable, + GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable, + GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable, + }; + tonic::Status::new(code, err.to_string()) + } +} + +impl From for QueryError { + fn from(e: HandleUpgradeError) -> Self { match e { - crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown, + HandleUpgradeError::ShutDown => QueryError::Shutdown, + } + } +} + +impl From for tonic::Status { + fn from(err: HandleUpgradeError) -> Self { + match err { + HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"), } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 23c40a7629..9ddbe404d2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -950,6 +950,18 @@ pub(crate) enum WaitLsnError { Timeout(String), } +impl From for tonic::Status { + fn from(err: WaitLsnError) -> Self { + use tonic::Code; + let code = match &err { + WaitLsnError::Timeout(_) => Code::Internal, + WaitLsnError::BadState(_) => Code::Internal, + WaitLsnError::Shutdown => Code::Unavailable, + }; + tonic::Status::new(code, err.to_string()) + } +} + // The impls below achieve cancellation mapping for errors. // Perhaps there's a way of achieving this with less cruft. From a650f7f5af4773bc6c7806a12b49e84234c7e6d6 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 3 Jun 2025 13:00:34 +0800 Subject: [PATCH 5/6] fix(pageserver): only deserialize reldir key once during get_db_size (#12102) ## Problem fix https://github.com/neondatabase/neon/issues/12101; this is a quick hack and we need better API in the future. In `get_db_size`, we call `get_reldir_size` for every relation. However, we do the same deserializing the reldir directory thing for every relation. This creates huge CPU overhead. ## Summary of changes Get and deserialize the reldir v1 key once and use it across all get_rel_size requests. --------- Signed-off-by: Alex Chi Z --- pageserver/src/pgdatadir_mapping.rs | 58 ++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c6f3929257..b6f11b744b 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -471,8 +471,19 @@ impl Timeline { let rels = self.list_rels(spcnode, dbnode, version, ctx).await?; + if rels.is_empty() { + return Ok(0); + } + + // Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`. + let reldir_key = rel_dir_to_key(spcnode, dbnode); + let buf = version.get(self, reldir_key, ctx).await?; + let reldir = RelDirectory::des(&buf)?; + for rel in rels { - let n_blocks = self.get_rel_size(rel, version, ctx).await?; + let n_blocks = self + .get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx) + .await?; total_blocks += n_blocks as usize; } Ok(total_blocks) @@ -487,6 +498,19 @@ impl Timeline { tag: RelTag, version: Version<'_>, ctx: &RequestContext, + ) -> Result { + self.get_rel_size_in_reldir(tag, version, None, ctx).await + } + + /// Get size of a relation file. The relation must exist, otherwise an error is returned. + /// + /// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`. + pub(crate) async fn get_rel_size_in_reldir( + &self, + tag: RelTag, + version: Version<'_>, + deserialized_reldir_v1: Option<(Key, &RelDirectory)>, + ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { return Err(PageReconstructError::Other( @@ -499,7 +523,9 @@ impl Timeline { } if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) - && !self.get_rel_exists(tag, version, ctx).await? + && !self + .get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx) + .await? { // FIXME: Postgres sometimes calls smgrcreate() to create // FSM, and smgrnblocks() on it immediately afterwards, @@ -521,11 +547,28 @@ impl Timeline { /// /// Only shard 0 has a full view of the relations. Other shards only know about relations that /// the shard stores pages for. + /// pub(crate) async fn get_rel_exists( &self, tag: RelTag, version: Version<'_>, ctx: &RequestContext, + ) -> Result { + self.get_rel_exists_in_reldir(tag, version, None, ctx).await + } + + /// Does the relation exist? With a cached deserialized `RelDirectory`. + /// + /// There are some cases where the caller loops across all relations. In that specific case, + /// the caller should obtain the deserialized `RelDirectory` first and then call this function + /// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing + /// a new API (e.g., `get_rel_exists_batched`). + pub(crate) async fn get_rel_exists_in_reldir( + &self, + tag: RelTag, + version: Version<'_>, + deserialized_reldir_v1: Option<(Key, &RelDirectory)>, + ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { return Err(PageReconstructError::Other( @@ -568,6 +611,17 @@ impl Timeline { // fetch directory listing (old) let key = rel_dir_to_key(tag.spcnode, tag.dbnode); + + if let Some((cached_key, dir)) = deserialized_reldir_v1 { + if cached_key == key { + return Ok(dir.rels.contains(&(tag.relnode, tag.forknum))); + } else if cfg!(test) || cfg!(feature = "testing") { + panic!("cached reldir key mismatch: {cached_key} != {key}"); + } else { + warn!("cached reldir key mismatch: {cached_key} != {key}"); + } + // Fallback to reading the directory from the datadir. + } let buf = version.get(self, key, ctx).await?; let dir = RelDirectory::des(&buf)?; From 3e72edede524af50220d0d103df08a1f6e12e6a9 Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Tue, 3 Jun 2025 09:23:17 +0200 Subject: [PATCH 6/6] Use full hostname for ONNX URL (#12064) ## Problem We should use the full host name for computes, according to https://github.com/neondatabase/cloud/issues/26005 , but now a truncated host name is used. ## Summary of changes The URL for REMOTE_ONNX is rewritten using the FQDN. --- compute/compute-node.Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 3459983a34..2afdde0cfa 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1180,14 +1180,14 @@ RUN cd exts/rag && \ RUN cd exts/rag_bge_small_en_v15 && \ sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \ ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \ - REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/bge_small_en_v15.onnx \ + REMOTE_ONNX_URL=http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/pgrag-data/bge_small_en_v15.onnx \ cargo pgrx install --release --features remote_onnx && \ echo "trusted = true" >> /usr/local/pgsql/share/extension/rag_bge_small_en_v15.control RUN cd exts/rag_jina_reranker_v1_tiny_en && \ sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \ ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \ - REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/jina_reranker_v1_tiny_en.onnx \ + REMOTE_ONNX_URL=http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/pgrag-data/jina_reranker_v1_tiny_en.onnx \ cargo pgrx install --release --features remote_onnx && \ echo "trusted = true" >> /usr/local/pgsql/share/extension/rag_jina_reranker_v1_tiny_en.control