WIP: profile walingest

This commit is contained in:
Bojan Serafimov
2023-11-21 10:24:05 -05:00
parent 779a5ab9ff
commit 65eb48aab7
8 changed files with 91 additions and 5 deletions

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 80 KiB

After

Width:  |  Height:  |  Size: 80 KiB

View File

@@ -255,7 +255,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) {
fn start_pageserver(
launch_ts: &'static LaunchTimestamp,
conf: &'static PageServerConf,
) -> anyhow::Result<()> {
) -> anyhow::Result<()> { // TODO this should be anyhow::Result<!>
// Monotonic time for later calculating startup duration
let started_startup_at = Instant::now();
@@ -273,6 +273,8 @@ fn start_pageserver(
set_launch_timestamp_metric(launch_ts);
pageserver::preinitialize_metrics();
let profiler_guard = pageserver::profiling::init_profiler();
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
let failpoints = fail::list();
@@ -674,10 +676,14 @@ fn start_pageserver(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
#[cfg(feature = "profiling")]
pageserver::profiling::exit_profiler(&profiler_guard);
std::process::exit(111);
}
Signal::Interrupt | Signal::Terminate => {
Signal::Quit | Signal::Interrupt | Signal::Terminate => {
info!(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
@@ -689,6 +695,10 @@ fn start_pageserver(
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
#[cfg(feature = "profiling")]
pageserver::profiling::exit_profiler(&profiler_guard);
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
bg_remote_storage.map(|_| bg_deletion_queue),
0,

View File

@@ -618,3 +618,65 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::harness::*;
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let pg_version = 15;
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
// We can't use create_test_timeline because it doesn't create a real
// checkpoint, and Walingest::new tries to parse the garbage data.
let tline = tenant
.bootstrap_timeline(TIMELINE_ID, pg_version, &ctx)
.await?;
// Steps to reconstruct this test data:
// 1. Run the pgbench python test
// 2. Take the first wal segment file from safekeeper
// 3. Grep sk logs for "restart decoder" to get startpoint
// 4. Run just the decoder from this test to get the endpoint.
// It's the last LSN the decoder will output.
let pg_version = 15;
let path = "test_data/sk_wal_segment_from_pgbench";
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let endpoint = Lsn::from_hex("1FFFF98").unwrap();
// We fully read this into memory before decoding to get a
// more accurate perf profile of the decoder.
let bytes = std::fs::read(path)?;
let profiler_guard = crate::profiling::init_profiler();
let prof_guard = crate::profiling::profpoint_start();
let started_at = std::time::Instant::now();
// Feed bytes to the decoder
let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE);
let mut decoder = WalStreamDecoder::new(startpoint, pg_version);
decoder.feed_bytes(&bytes[xlogoff..]);
println!("decoding {} bytes", bytes.len() - xlogoff);
// Decode and ingest wal
let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?;
let mut modification = tline.begin_modification(endpoint);
let mut decoded = DecodedWALRecord::default();
while let Some((lsn, recdata)) = decoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await?;
}
let duration = started_at.elapsed();
println!("done in {:?}", duration);
drop(prof_guard);
#[cfg(feature = "profiling")]
crate::profiling::exit_profiler(&profiler_guard);
Ok(())
}
}

View File

@@ -36,6 +36,8 @@ mod profiling_impl {
pub struct ProfilingGuard(PhantomUnsend);
unsafe impl Send for ProfilingGuard {}
impl Drop for ProfilingGuard {
fn drop(&mut self) {
pprof::stop_profiling();

View File

@@ -121,6 +121,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
// .worker_threads(1)
.thread_name("walreceiver worker")
.enable_all()
.build()

View File

@@ -2868,7 +2868,7 @@ impl Tenant {
/// - after initialization complete, remove the temp dir.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
pub async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,

View File

@@ -244,6 +244,9 @@ pub(super) async fn handle_walreceiver_connection(
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
// tokio::task::block_in_place(move || {
// tokio::runtime::Handle::current().block_on(async move {
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let copy_stream = replication_client.copy_both_simple(&query).await?;
@@ -293,6 +296,8 @@ pub(super) async fn handle_walreceiver_connection(
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
let prof_guard = crate::profiling::profpoint_start();
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
@@ -330,6 +335,7 @@ pub(super) async fn handle_walreceiver_connection(
caught_up = true;
}
drop(prof_guard);
Some(endlsn)
}
@@ -418,6 +424,11 @@ pub(super) async fn handle_walreceiver_connection(
}
}
// Ok(())
// })?;
// Ok::<(), WalReceiverError>(())
// })?;
Ok(())
}

Binary file not shown.