diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0d40c5ecf7..3e844a375d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; use enumset::EnumSet; use futures::future::join_all; use futures::{StreamExt, TryFutureExt}; @@ -46,6 +47,7 @@ use pageserver_api::shard::{ShardCount, TenantShardId}; use postgres_ffi::PgMajorVersion; use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError}; use scopeguard::defer; +use serde::{Deserialize, Serialize}; use serde_json::json; use tenant_size_model::svg::SvgBranchKind; use tenant_size_model::{SizeResult, StorageModel}; @@ -57,6 +59,7 @@ use utils::auth::SwappableJwtAuth; use utils::generation::Generation; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use wal_decoder::models::record::NeonWalRecord; use crate::config::PageServerConf; use crate::context; @@ -77,6 +80,7 @@ use crate::tenant::remote_timeline_client::{ }; use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; +use crate::tenant::storage_layer::ValuesReconstructState; use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName}; use crate::tenant::timeline::layer_manager::LayerManagerLockHolder; use crate::tenant::timeline::offload::{OffloadError, offload_timeline}; @@ -2708,6 +2712,16 @@ async fn deletion_queue_flush( } } +/// Try if `GetPage@Lsn` is successful, useful for manual debugging. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct GetPageResponse { + pub page: Bytes, + pub layers_visited: u32, + pub delta_layers_visited: u32, + pub records: Vec<(Lsn, NeonWalRecord)>, + pub img: Option<(Lsn, Bytes)>, +} + async fn getpage_at_lsn_handler( request: Request, cancel: CancellationToken, @@ -2758,21 +2772,24 @@ async fn getpage_at_lsn_handler_inner( // Use last_record_lsn if no lsn is provided let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); - let page = timeline.get(key.0, lsn, &ctx).await?; if touch { json_response(StatusCode::OK, ()) } else { - Result::<_, ApiError>::Ok( - Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/octet-stream") - .body(hyper::Body::from(page)) - .unwrap(), - ) + let mut reconstruct_state = ValuesReconstructState::new_with_debug(IoConcurrency::sequential()); + let page = timeline.debug_get(key.0, lsn, &ctx, &mut reconstruct_state).await?; + let response = GetPageResponse { + page, + layers_visited: reconstruct_state.get_layers_visited(), + delta_layers_visited: reconstruct_state.get_delta_layers_visited(), + records: reconstruct_state.debug_state.records.clone(), + img: reconstruct_state.debug_state.img.clone(), + }; + + json_response(StatusCode::OK, response) } } - .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) + .instrument(info_span!("timeline_debug_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 9fbb9d2438..43ea8fffa3 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -75,7 +75,7 @@ where /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data' /// call, to collect more records. /// -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(crate) struct ValueReconstructState { pub(crate) records: Vec<(Lsn, NeonWalRecord)>, pub(crate) img: Option<(Lsn, Bytes)>, @@ -308,6 +308,9 @@ pub struct ValuesReconstructState { layers_visited: u32, delta_layers_visited: u32, + pub(crate) enable_debug: bool, + pub(crate) debug_state: ValueReconstructState, + pub(crate) io_concurrency: IoConcurrency, num_active_ios: Arc, @@ -657,6 +660,23 @@ impl ValuesReconstructState { layers_visited: 0, delta_layers_visited: 0, io_concurrency, + enable_debug: false, + debug_state: ValueReconstructState::default(), + num_active_ios: Arc::new(AtomicUsize::new(0)), + read_path: None, + } + } + + pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self { + Self { + keys: HashMap::new(), + keys_done: KeySpaceRandomAccum::new(), + keys_with_image_coverage: None, + layers_visited: 0, + delta_layers_visited: 0, + io_concurrency, + enable_debug: true, + debug_state: ValueReconstructState::default(), num_active_ios: Arc::new(AtomicUsize::new(0)), read_path: None, } @@ -670,6 +690,12 @@ impl ValuesReconstructState { self.io_concurrency.spawn_io(fut).await; } + pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) { + if self.enable_debug { + self.debug_state = debug_state.clone(); + } + } + pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) { self.layers_visited += 1; if let ReadableLayer::PersistentLayer(layer) = layer { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f2833674a9..73d2d72b59 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1253,6 +1253,57 @@ impl Timeline { } } + #[inline(always)] + pub(crate) async fn debug_get( + &self, + key: Key, + lsn: Lsn, + ctx: &RequestContext, + reconstruct_state: &mut ValuesReconstructState, + ) -> Result { + if !lsn.is_valid() { + return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); + } + + // This check is debug-only because of the cost of hashing, and because it's a double-check: we + // already checked the key against the shard_identity when looking up the Timeline from + // page_service. + debug_assert!(!self.shard_identity.is_key_disposable(&key)); + + let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn); + let vectored_res = self + .debug_get_vectored_impl(query, reconstruct_state, ctx) + .await; + + let key_value = vectored_res?.pop_first(); + match key_value { + Some((got_key, value)) => { + if got_key != key { + error!( + "Expected {}, but singular vectored get returned {}", + key, got_key + ); + Err(PageReconstructError::Other(anyhow!( + "Singular vectored get returned wrong key" + ))) + } else { + value + } + } + None => Err(PageReconstructError::MissingKey(Box::new( + MissingKeyError { + keyspace: KeySpace::single(key..key.next()), + shard: self.shard_identity.get_shard_number(&key), + original_hwm_lsn: lsn, + ancestor_lsn: None, + backtrace: None, + read_path: None, + query: None, + }, + ))), + } + } + pub(crate) const LAYERS_VISITED_WARN_THRESHOLD: u32 = 100; /// Look up multiple page versions at a given LSN @@ -1547,6 +1598,98 @@ impl Timeline { Ok(results) } + // A copy of the get_vectored_impl method except that we store the image and wal records into `reconstruct_state`. + // This is only used in the http getpage call for debugging purpose. + pub(super) async fn debug_get_vectored_impl( + &self, + query: VersionedKeySpaceQuery, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + if query.is_empty() { + return Ok(BTreeMap::default()); + } + + let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() { + Some(ReadPath::new( + query.total_keyspace(), + query.high_watermark_lsn()?, + )) + } else { + None + }; + + reconstruct_state.read_path = read_path; + + let traversal_res: Result<(), _> = self + .get_vectored_reconstruct_data(query.clone(), reconstruct_state, ctx) + .await; + + if let Err(err) = traversal_res { + // Wait for all the spawned IOs to complete. + // See comments on `spawn_io` inside `storage_layer` for more details. + let mut collect_futs = std::mem::take(&mut reconstruct_state.keys) + .into_values() + .map(|state| state.collect_pending_ios()) + .collect::>(); + while collect_futs.next().await.is_some() {} + return Err(err); + }; + + let reconstruct_state = Arc::new(Mutex::new(reconstruct_state)); + let futs = FuturesUnordered::new(); + + for (key, state) in std::mem::take(&mut reconstruct_state.lock().unwrap().keys) { + let req_lsn_for_key = query.map_key_to_lsn(&key); + futs.push({ + let walredo_self = self.myself.upgrade().expect("&self method holds the arc"); + let rc_clone = Arc::clone(&reconstruct_state); + + async move { + assert_eq!(state.situation, ValueReconstructSituation::Complete); + + let converted = match state.collect_pending_ios().await { + Ok(ok) => ok, + Err(err) => { + return (key, Err(err)); + } + }; + DELTAS_PER_READ_GLOBAL.observe(converted.num_deltas() as f64); + + // The walredo module expects the records to be descending in terms of Lsn. + // And we submit the IOs in that order, so, there shuold be no need to sort here. + debug_assert!( + converted + .records + .is_sorted_by_key(|(lsn, _)| std::cmp::Reverse(*lsn)), + "{converted:?}" + ); + { + let mut guard = rc_clone.lock().unwrap(); + guard.set_debug_state(&converted); + } + ( + key, + walredo_self + .reconstruct_value( + key, + req_lsn_for_key, + converted, + RedoAttemptType::ReadPage, + ) + .await, + ) + } + }); + } + + let results = futs + .collect::>>() + .await; + + Ok(results) + } + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub(crate) fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last