WIP: Process received GetPage requests in parallel

This commit is contained in:
Heikki Linnakangas
2022-11-08 14:42:40 +02:00
parent c5c4e47edd
commit 5bd843551c

View File

@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::stream::FuturesOrdered;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -39,10 +40,9 @@ use utils::{
};
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::config::PageServerConf;
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -276,10 +276,10 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
let mut inprogress_requests = FuturesOrdered::new();
loop {
let msg = tokio::select! {
tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
@@ -288,51 +288,58 @@ impl PageServerHandler {
break;
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
let response: Bytes = response.unwrap()?;
pgb.write_message(&BeMessage::CopyData(&response))?;
pgb.flush().await?;
continue;
}
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
msg = pgb.read_message() => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
trace!("query: {copy_data_bytes:?}");
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
self.handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.get_rel_size.start_timer();
self.handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.get_page_at_lsn.start_timer();
self.handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let timeline = Arc::clone(&timeline);
let task = async move {
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
Self::handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
Self::handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
Self::handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
Self::handle_db_size_request(&timeline, &req).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
response
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
}
@@ -504,9 +511,8 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
@@ -521,9 +527,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
@@ -538,9 +543,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
@@ -558,9 +562,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
@@ -579,7 +582,7 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
//let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -587,9 +590,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, pgb))]
#[instrument(skip(pgb))]
async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -722,7 +724,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
@@ -782,7 +784,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {