mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
WIP: Process received GetPage requests in parallel
This commit is contained in:
@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures::stream::FuturesOrdered;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -25,6 +26,7 @@ use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::pin;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::io::SyncIoBridge;
|
||||
@@ -54,6 +56,9 @@ use crate::CheckpointConfig;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
/// Number of requests to process in parallel, from a single connection
|
||||
const MAX_INFLIGHT_REQUESTS: usize = 4;
|
||||
|
||||
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
|
||||
async_stream::try_stream! {
|
||||
loop {
|
||||
@@ -224,6 +229,13 @@ struct PageRequestMetrics {
|
||||
get_db_size: metrics::Histogram,
|
||||
}
|
||||
|
||||
pub enum RequestType {
|
||||
Exists,
|
||||
Nblocks,
|
||||
GetPage,
|
||||
DbSize,
|
||||
}
|
||||
|
||||
impl PageRequestMetrics {
|
||||
fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self {
|
||||
let tenant_id = tenant_id.to_string();
|
||||
@@ -298,66 +310,101 @@ impl PageServerHandler {
|
||||
|
||||
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
|
||||
|
||||
//
|
||||
// Main loop to handle the stream of requests
|
||||
//
|
||||
// We process multiple requests in parallel, by spawning a new Task for each
|
||||
// incoming request.
|
||||
let mut inprogress_requests = FuturesOrdered::new();
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// If we were requested to shut down, stop
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
break;
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
// When a task completes, send the response to the client
|
||||
completed_task = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
|
||||
let response: Bytes;
|
||||
let request_type: RequestType;
|
||||
let elapsed_sec: f64;
|
||||
(response, request_type, elapsed_sec) = completed_task.unwrap()?;
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
pgb.write_message(&BeMessage::CopyData(&response))?;
|
||||
pgb.flush().await?;
|
||||
|
||||
match request_type {
|
||||
RequestType::Exists => metrics.get_rel_exists.observe(elapsed_sec),
|
||||
RequestType::Nblocks => metrics.get_rel_size.observe(elapsed_sec),
|
||||
RequestType::GetPage => metrics.get_page_at_lsn.observe(elapsed_sec),
|
||||
RequestType::DbSize => metrics.get_db_size.observe(elapsed_sec),
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
// When a new request arrives, spawn a task to process it.
|
||||
// If we already have MAX_INFLIGHT_REQUESTS requests in-progress, however,
|
||||
// don't start new ones.
|
||||
msg = pgb.read_message(), if inprogress_requests.len() < MAX_INFLIGHT_REQUESTS => {
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
// Trace request if needed
|
||||
if let Some(t) = tracer.as_mut() {
|
||||
t.trace(©_data_bytes)
|
||||
}
|
||||
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let _timer = metrics.get_rel_exists.start_timer();
|
||||
self.handle_get_rel_exists_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let _timer = metrics.get_rel_size.start_timer();
|
||||
self.handle_get_nblocks_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let _timer = metrics.get_page_at_lsn.start_timer();
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let _timer = metrics.get_db_size.start_timer();
|
||||
self.handle_db_size_request(&timeline, &req).await
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let timeline = Arc::clone(&timeline);
|
||||
let conf = self.conf;
|
||||
let task = async move {
|
||||
let start_time = Instant::now();
|
||||
let (response, request_type) = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
(Self::handle_get_rel_exists_request(&timeline, &req).await,
|
||||
RequestType::Exists)
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
(Self::handle_get_nblocks_request(&timeline, &req).await,
|
||||
RequestType::Nblocks)
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
(Self::handle_get_page_at_lsn_request(conf, &timeline, &req).await,
|
||||
RequestType::GetPage)
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
(Self::handle_db_size_request(&timeline, &req).await,
|
||||
RequestType::DbSize)
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
let response: Bytes = response.serialize();
|
||||
(response, request_type, start_time.elapsed().as_secs_f64())
|
||||
};
|
||||
inprogress_requests.push_back(tokio::spawn(task));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -529,9 +576,8 @@ impl PageServerHandler {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -546,9 +592,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -563,9 +608,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -583,11 +627,11 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
#[instrument(skip(conf, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &'a Timeline,
|
||||
req: &'a PagestreamGetPageRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
|
||||
@@ -604,7 +648,7 @@ impl PageServerHandler {
|
||||
// FIXME: this profiling now happens at different place than it used to. The
|
||||
// current profiling is based on a thread-local variable, so it doesn't work
|
||||
// across awaits
|
||||
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
let _profiling_guard = profpoint_start(conf, ProfilingConfig::PageRequests);
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -612,9 +656,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb))]
|
||||
#[instrument(skip(pgb))]
|
||||
async fn handle_basebackup_request(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -747,7 +790,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
@@ -807,7 +850,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("import basebackup ") {
|
||||
|
||||
Reference in New Issue
Block a user