diff --git a/pageserver/flamegraph.svg b/pageserver/flamegraph.svg index 2fc62747c2..869a445d4c 100644 --- a/pageserver/flamegraph.svg +++ b/pageserver/flamegraph.svg @@ -1,4 +1,4 @@ - \ No newline at end of file diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 68865b1684..c8af06fdfa 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -255,7 +255,7 @@ fn startup_checkpoint(started_at: Instant, phase: &str, human_phase: &str) { fn start_pageserver( launch_ts: &'static LaunchTimestamp, conf: &'static PageServerConf, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> { // TODO this should be anyhow::Result // Monotonic time for later calculating startup duration let started_startup_at = Instant::now(); @@ -273,6 +273,8 @@ fn start_pageserver( set_launch_timestamp_metric(launch_ts); pageserver::preinitialize_metrics(); + let profiler_guard = pageserver::profiling::init_profiler(); + // If any failpoints were set from FAILPOINTS environment variable, // print them to the log for debugging purposes let failpoints = fail::list(); @@ -674,10 +676,14 @@ fn start_pageserver( "Got {}. Terminating in immediate shutdown mode", signal.name() ); + + #[cfg(feature = "profiling")] + pageserver::profiling::exit_profiler(&profiler_guard); + std::process::exit(111); } - Signal::Interrupt | Signal::Terminate => { + Signal::Quit | Signal::Interrupt | Signal::Terminate => { info!( "Got {}. Terminating gracefully in fast shutdown mode", signal.name() @@ -689,6 +695,10 @@ fn start_pageserver( shutdown_pageserver.take(); let bg_remote_storage = remote_storage.clone(); let bg_deletion_queue = deletion_queue.clone(); + + #[cfg(feature = "profiling")] + pageserver::profiling::exit_profiler(&profiler_guard); + BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( bg_remote_storage.map(|_| bg_deletion_queue), 0, diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 30975c1fc9..f148d21d9c 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -618,3 +618,65 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result reader.read_to_end(&mut buf).await?; Ok(Bytes::from(buf)) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::tenant::harness::*; + + #[tokio::test] + async fn test_basic() -> anyhow::Result<()> { + let pg_version = 15; + let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; + + // We can't use create_test_timeline because it doesn't create a real + // checkpoint, and Walingest::new tries to parse the garbage data. + let tline = tenant + .bootstrap_timeline(TIMELINE_ID, pg_version, &ctx) + .await?; + + // Steps to reconstruct this test data: + // 1. Run the pgbench python test + // 2. Take the first wal segment file from safekeeper + // 3. Grep sk logs for "restart decoder" to get startpoint + // 4. Run just the decoder from this test to get the endpoint. + // It's the last LSN the decoder will output. + let pg_version = 15; + let path = "test_data/sk_wal_segment_from_pgbench"; + let startpoint = Lsn::from_hex("14AEC08").unwrap(); + let endpoint = Lsn::from_hex("1FFFF98").unwrap(); + + // We fully read this into memory before decoding to get a + // more accurate perf profile of the decoder. + let bytes = std::fs::read(path)?; + + let profiler_guard = crate::profiling::init_profiler(); + let prof_guard = crate::profiling::profpoint_start(); + let started_at = std::time::Instant::now(); + + // Feed bytes to the decoder + let xlogoff: usize = startpoint.segment_offset(WAL_SEGMENT_SIZE); + let mut decoder = WalStreamDecoder::new(startpoint, pg_version); + decoder.feed_bytes(&bytes[xlogoff..]); + println!("decoding {} bytes", bytes.len() - xlogoff); + + // Decode and ingest wal + let mut walingest = WalIngest::new(tline.as_ref(), startpoint, &ctx).await?; + let mut modification = tline.begin_modification(endpoint); + let mut decoded = DecodedWALRecord::default(); + while let Some((lsn, recdata)) = decoder.poll_decode()? { + walingest + .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) + .await?; + } + + let duration = started_at.elapsed(); + println!("done in {:?}", duration); + drop(prof_guard); + + #[cfg(feature = "profiling")] + crate::profiling::exit_profiler(&profiler_guard); + + Ok(()) + } +} diff --git a/pageserver/src/profiling.rs b/pageserver/src/profiling.rs index fd27c994d0..af59b0b491 100644 --- a/pageserver/src/profiling.rs +++ b/pageserver/src/profiling.rs @@ -36,6 +36,8 @@ mod profiling_impl { pub struct ProfilingGuard(PhantomUnsend); + unsafe impl Send for ProfilingGuard {} + impl Drop for ProfilingGuard { fn drop(&mut self) { pprof::stop_profiling(); diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 4270b6edb0..b5e22d55f7 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -121,6 +121,7 @@ pub static MGMT_REQUEST_RUNTIME: Lazy = Lazy::new(|| { pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() + // .worker_threads(1) .thread_name("walreceiver worker") .enable_all() .build() diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a738633d5e..c0f8bb1e35 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2868,7 +2868,7 @@ impl Tenant { /// - after initialization complete, remove the temp dir. /// /// The caller is responsible for activating the returned timeline. - async fn bootstrap_timeline( + pub async fn bootstrap_timeline( &self, timeline_id: TimelineId, pg_version: u32, diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 3e56753ad4..fb00ff6448 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -244,6 +244,9 @@ pub(super) async fn handle_walreceiver_connection( info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."); + // tokio::task::block_in_place(move || { + // tokio::runtime::Handle::current().block_on(async move { + let query = format!("START_REPLICATION PHYSICAL {startpoint}"); let copy_stream = replication_client.copy_both_simple(&query).await?; @@ -293,6 +296,8 @@ pub(super) async fn handle_walreceiver_connection( let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { + let prof_guard = crate::profiling::profpoint_start(); + // Pass the WAL data to the decoder, and see if we can decode // more records as a result. let data = xlog_data.data(); @@ -330,6 +335,7 @@ pub(super) async fn handle_walreceiver_connection( caught_up = true; } + drop(prof_guard); Some(endlsn) } @@ -418,6 +424,11 @@ pub(super) async fn handle_walreceiver_connection( } } + // Ok(()) + // })?; + // Ok::<(), WalReceiverError>(()) + // })?; + Ok(()) } diff --git a/pageserver/test_data/sk_wal_segment_from_pgbench b/pageserver/test_data/sk_wal_segment_from_pgbench new file mode 100644 index 0000000000..71190be5f3 Binary files /dev/null and b/pageserver/test_data/sk_wal_segment_from_pgbench differ