From 5bd843551c3ad1aa46aae653a82172f1ce8c85af Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 8 Nov 2022 14:42:40 +0200 Subject: [PATCH] WIP: Process received GetPage requests in parallel --- pageserver/src/page_service.rs | 114 +++++++++++++++++---------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9c297bc835..4632255f52 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::Buf; use bytes::Bytes; use futures::{Stream, StreamExt}; +use futures::stream::FuturesOrdered; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -39,10 +40,9 @@ use utils::{ }; use crate::basebackup; -use crate::config::{PageServerConf, ProfilingConfig}; +use crate::config::PageServerConf; use crate::import_datadir::import_wal_from_tar; use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; -use crate::profiling::profpoint_start; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::Timeline; @@ -276,10 +276,10 @@ impl PageServerHandler { pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.flush().await?; - let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id); + let mut inprogress_requests = FuturesOrdered::new(); loop { - let msg = tokio::select! { + tokio::select! { biased; _ = task_mgr::shutdown_watcher() => { @@ -288,51 +288,58 @@ impl PageServerHandler { break; } - msg = pgb.read_message() => { msg } - }; - - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(m) => { - bail!("unexpected message: {m:?} during COPY"); + response = inprogress_requests.next(), if !inprogress_requests.is_empty() => { + let response: Bytes = response.unwrap()?; + pgb.write_message(&BeMessage::CopyData(&response))?; + pgb.flush().await?; + continue; } - None => break, // client disconnected - }; - trace!("query: {copy_data_bytes:?}"); + msg = pgb.read_message() => { + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(m) => { + bail!("unexpected message: {m:?} during COPY"); + } + None => break, // client disconnected + }; - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + trace!("query: {copy_data_bytes:?}"); - let response = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { - let _timer = metrics.get_rel_exists.start_timer(); - self.handle_get_rel_exists_request(&timeline, &req).await - } - PagestreamFeMessage::Nblocks(req) => { - let _timer = metrics.get_rel_size.start_timer(); - self.handle_get_nblocks_request(&timeline, &req).await - } - PagestreamFeMessage::GetPage(req) => { - let _timer = metrics.get_page_at_lsn.start_timer(); - self.handle_get_page_at_lsn_request(&timeline, &req).await - } - PagestreamFeMessage::DbSize(req) => { - let _timer = metrics.get_db_size.start_timer(); - self.handle_db_size_request(&timeline, &req).await + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let timeline = Arc::clone(&timeline); + let task = async move { + let response = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => { + Self::handle_get_rel_exists_request(&timeline, &req).await + } + PagestreamFeMessage::Nblocks(req) => { + Self::handle_get_nblocks_request(&timeline, &req).await + } + PagestreamFeMessage::GetPage(req) => { + Self::handle_get_page_at_lsn_request(&timeline, &req).await + } + PagestreamFeMessage::DbSize(req) => { + Self::handle_db_size_request(&timeline, &req).await + } + }; + + let response = response.unwrap_or_else(|e| { + // print the all details to the log with {:#}, but for the client the + // error message is enough + error!("error reading relation or page version: {:?}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + let response: Bytes = response.serialize(); + response + }; + inprogress_requests.push_back(tokio::spawn(task)); + continue; } }; - - let response = response.unwrap_or_else(|e| { - // print the all details to the log with {:#}, but for the client the - // error message is enough - error!("error reading relation or page version: {:?}", e); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - }); - - pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; - pgb.flush().await?; } Ok(()) } @@ -504,9 +511,8 @@ impl PageServerHandler { Ok(lsn) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_rel_exists_request( - &self, timeline: &Timeline, req: &PagestreamExistsRequest, ) -> Result { @@ -521,9 +527,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_nblocks_request( - &self, timeline: &Timeline, req: &PagestreamNblocksRequest, ) -> Result { @@ -538,9 +543,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] async fn handle_db_size_request( - &self, timeline: &Timeline, req: &PagestreamDbSizeRequest, ) -> Result { @@ -558,9 +562,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] async fn handle_get_page_at_lsn_request( - &self, timeline: &Timeline, req: &PagestreamGetPageRequest, ) -> Result { @@ -579,7 +582,7 @@ impl PageServerHandler { // FIXME: this profiling now happens at different place than it used to. The // current profiling is based on a thread-local variable, so it doesn't work // across awaits - let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests); + //let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests); let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -587,9 +590,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, pgb))] + #[instrument(skip(pgb))] async fn handle_basebackup_request( - &self, pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, @@ -722,7 +724,7 @@ impl postgres_backend_async::Handler for PageServerHandler { }; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) .await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } @@ -782,7 +784,7 @@ impl postgres_backend_async::Handler for PageServerHandler { self.check_permission(Some(tenant_id))?; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) .await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") {