From 2359f69278a5c0021b965704f467be5ea314f2ff Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 21 Nov 2022 16:27:20 +0200 Subject: [PATCH] WIP: Process received GetPage requests in parallel --- pageserver/src/page_service.rs | 163 +++++++++++++++++++++------------ 1 file changed, 103 insertions(+), 60 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index eb9416a482..1ae5d592d8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::Buf; use bytes::Bytes; use futures::{Stream, StreamExt}; +use futures::stream::FuturesOrdered; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -25,6 +26,7 @@ use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::Arc; +use std::time::Instant; use tokio::pin; use tokio_util::io::StreamReader; use tokio_util::io::SyncIoBridge; @@ -54,6 +56,9 @@ use crate::CheckpointConfig; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +/// Number of requests to process in parallel, from a single connection +const MAX_INFLIGHT_REQUESTS: usize = 4; + fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream> + '_ { async_stream::try_stream! { loop { @@ -224,6 +229,13 @@ struct PageRequestMetrics { get_db_size: metrics::Histogram, } +pub enum RequestType { + Exists, + Nblocks, + GetPage, + DbSize, +} + impl PageRequestMetrics { fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_id.to_string(); @@ -298,66 +310,101 @@ impl PageServerHandler { let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id); + // + // Main loop to handle the stream of requests + // + // We process multiple requests in parallel, by spawning a new Task for each + // incoming request. + let mut inprogress_requests = FuturesOrdered::new(); loop { - let msg = tokio::select! { + tokio::select! { biased; + // If we were requested to shut down, stop _ = task_mgr::shutdown_watcher() => { - // We were requested to shut down. info!("shutdown request received in page handler"); break; } - msg = pgb.read_message() => { msg } - }; + // When a task completes, send the response to the client + completed_task = inprogress_requests.next(), if !inprogress_requests.is_empty() => { + let response: Bytes; + let request_type: RequestType; + let elapsed_sec: f64; + (response, request_type, elapsed_sec) = completed_task.unwrap()?; - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(m) => { - bail!("unexpected message: {m:?} during COPY"); + pgb.write_message(&BeMessage::CopyData(&response))?; + pgb.flush().await?; + + match request_type { + RequestType::Exists => metrics.get_rel_exists.observe(elapsed_sec), + RequestType::Nblocks => metrics.get_rel_size.observe(elapsed_sec), + RequestType::GetPage => metrics.get_page_at_lsn.observe(elapsed_sec), + RequestType::DbSize => metrics.get_db_size.observe(elapsed_sec), + } + + continue; } - None => break, // client disconnected - }; - trace!("query: {copy_data_bytes:?}"); + // When a new request arrives, spawn a task to process it. + // If we already have MAX_INFLIGHT_REQUESTS requests in-progress, however, + // don't start new ones. + msg = pgb.read_message(), if inprogress_requests.len() < MAX_INFLIGHT_REQUESTS => { + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(m) => { + bail!("unexpected message: {m:?} during COPY"); + } + None => break, // client disconnected + }; - // Trace request if needed - if let Some(t) = tracer.as_mut() { - t.trace(©_data_bytes) - } + trace!("query: {copy_data_bytes:?}"); - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + // Trace request if needed + if let Some(t) = tracer.as_mut() { + t.trace(©_data_bytes) + } - let response = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { - let _timer = metrics.get_rel_exists.start_timer(); - self.handle_get_rel_exists_request(&timeline, &req).await - } - PagestreamFeMessage::Nblocks(req) => { - let _timer = metrics.get_rel_size.start_timer(); - self.handle_get_nblocks_request(&timeline, &req).await - } - PagestreamFeMessage::GetPage(req) => { - let _timer = metrics.get_page_at_lsn.start_timer(); - self.handle_get_page_at_lsn_request(&timeline, &req).await - } - PagestreamFeMessage::DbSize(req) => { - let _timer = metrics.get_db_size.start_timer(); - self.handle_db_size_request(&timeline, &req).await + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let timeline = Arc::clone(&timeline); + let conf = self.conf; + let task = async move { + let start_time = Instant::now(); + let (response, request_type) = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => { + (Self::handle_get_rel_exists_request(&timeline, &req).await, + RequestType::Exists) + } + PagestreamFeMessage::Nblocks(req) => { + (Self::handle_get_nblocks_request(&timeline, &req).await, + RequestType::Nblocks) + } + PagestreamFeMessage::GetPage(req) => { + (Self::handle_get_page_at_lsn_request(conf, &timeline, &req).await, + RequestType::GetPage) + } + PagestreamFeMessage::DbSize(req) => { + (Self::handle_db_size_request(&timeline, &req).await, + RequestType::DbSize) + } + }; + + let response = response.unwrap_or_else(|e| { + // print the all details to the log with {:#}, but for the client the + // error message is enough + error!("error reading relation or page version: {:?}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + let response: Bytes = response.serialize(); + (response, request_type, start_time.elapsed().as_secs_f64()) + }; + inprogress_requests.push_back(tokio::spawn(task)); + continue; } }; - - let response = response.unwrap_or_else(|e| { - // print the all details to the log with {:#}, but for the client the - // error message is enough - error!("error reading relation or page version: {:?}", e); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - }); - - pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; - pgb.flush().await?; } Ok(()) } @@ -529,9 +576,8 @@ impl PageServerHandler { Ok(lsn) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_rel_exists_request( - &self, timeline: &Timeline, req: &PagestreamExistsRequest, ) -> Result { @@ -546,9 +592,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_nblocks_request( - &self, timeline: &Timeline, req: &PagestreamNblocksRequest, ) -> Result { @@ -563,9 +608,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] async fn handle_db_size_request( - &self, timeline: &Timeline, req: &PagestreamDbSizeRequest, ) -> Result { @@ -583,11 +627,11 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] - async fn handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, - req: &PagestreamGetPageRequest, + #[instrument(skip(conf, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] + async fn handle_get_page_at_lsn_request<'a>( + conf: &'static PageServerConf, + timeline: &'a Timeline, + req: &'a PagestreamGetPageRequest, ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn) @@ -604,7 +648,7 @@ impl PageServerHandler { // FIXME: this profiling now happens at different place than it used to. The // current profiling is based on a thread-local variable, so it doesn't work // across awaits - let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests); + let _profiling_guard = profpoint_start(conf, ProfilingConfig::PageRequests); let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -612,9 +656,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, pgb))] + #[instrument(skip(pgb))] async fn handle_basebackup_request( - &self, pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, @@ -747,7 +790,7 @@ impl postgres_backend_async::Handler for PageServerHandler { }; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) .await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } @@ -807,7 +850,7 @@ impl postgres_backend_async::Handler for PageServerHandler { self.check_permission(Some(tenant_id))?; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) .await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") {