From 915fba146dab16dd37696520ea41891f702ebe49 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 11 Jan 2024 18:42:11 +0100 Subject: [PATCH] pagebench: getpage: optional keyspace cache file (#6324) Proved useful when benchmarking 20k tenant setup when validating https://github.com/neondatabase/neon/issues/5479 --- Cargo.lock | 1 + pageserver/pagebench/Cargo.toml | 1 + .../pagebench/src/cmd/getpage_latest_lsn.rs | 143 ++++++++++++------ 3 files changed, 99 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 512145f6c8..3c7fcc0f67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3211,6 +3211,7 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "camino", "clap", "futures", "hdrhistogram", diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 169d9b7f8e..257cc798e8 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +camino.workspace = true clap.workspace = true futures.workspace = true hdrhistogram.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index b134ed895d..7ed9ae53ce 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use camino::Utf8PathBuf; use futures::future::join_all; use pageserver::pgdatadir_mapping::key_to_rel_block; use pageserver::repository; @@ -14,7 +15,7 @@ use tokio::sync::Barrier; use tokio::task::JoinSet; use tracing::{info, instrument}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -45,6 +46,12 @@ pub(crate) struct Args { req_latest_probability: f64, #[clap(long)] limit_to_first_n_targets: Option, + /// For large pageserver installations, enumerating the keyspace takes a lot of time. + /// If specified, the specified path is used to maintain a cache of the keyspace enumeration result. + /// The cache is tagged and auto-invalided by the tenant/timeline ids only. + /// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction. + #[clap(long)] + keyspace_cache: Option, targets: Option>, } @@ -59,7 +66,7 @@ impl LiveStats { } } -#[derive(Clone)] +#[derive(Clone, serde::Serialize, serde::Deserialize)] struct KeyRange { timeline: TenantTimelineId, timeline_lsn: Lsn, @@ -107,52 +114,96 @@ async fn main_impl( ) .await?; - let mut js = JoinSet::new(); - for timeline in &timelines { - js.spawn({ - let mgmt_api_client = Arc::clone(&mgmt_api_client); - let timeline = *timeline; - async move { - let partitioning = mgmt_api_client - .keyspace(timeline.tenant_id, timeline.timeline_id) - .await?; - let lsn = partitioning.at_lsn; - let start = Instant::now(); - let mut filtered = KeySpaceAccum::new(); - // let's hope this is inlined and vectorized... - // TODO: turn this loop into a is_rel_block_range() function. - for r in partitioning.keys.ranges.iter() { - let mut i = r.start; - while i != r.end { - if is_rel_block_key(&i) { - filtered.add_key(i); - } - i = i.next(); - } + #[derive(serde::Deserialize)] + struct KeyspaceCacheDe { + tag: Vec, + data: Vec, + } + #[derive(serde::Serialize)] + struct KeyspaceCacheSer<'a> { + tag: &'a [TenantTimelineId], + data: &'a [KeyRange], + } + let cache = args + .keyspace_cache + .as_ref() + .map(|keyspace_cache_file| { + let contents = match std::fs::read(keyspace_cache_file) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return anyhow::Ok(None); } - let filtered = filtered.to_keyspace(); - let filter_duration = start.elapsed(); + x => x.context("read keyspace cache file")?, + }; + let cache: KeyspaceCacheDe = + serde_json::from_slice(&contents).context("deserialize cache file")?; + let tag_ok = HashSet::::from_iter(cache.tag.into_iter()) + == HashSet::from_iter(timelines.iter().cloned()); + info!("keyspace cache file matches tag: {tag_ok}"); + anyhow::Ok(if tag_ok { Some(cache.data) } else { None }) + }) + .transpose()? + .flatten(); + let all_ranges: Vec = if let Some(cached) = cache { + info!("using keyspace cache file"); + cached + } else { + let mut js = JoinSet::new(); + for timeline in &timelines { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + let timeline = *timeline; + async move { + let partitioning = mgmt_api_client + .keyspace(timeline.tenant_id, timeline.timeline_id) + .await?; + let lsn = partitioning.at_lsn; + let start = Instant::now(); + let mut filtered = KeySpaceAccum::new(); + // let's hope this is inlined and vectorized... + // TODO: turn this loop into a is_rel_block_range() function. + for r in partitioning.keys.ranges.iter() { + let mut i = r.start; + while i != r.end { + if is_rel_block_key(&i) { + filtered.add_key(i); + } + i = i.next(); + } + } + let filtered = filtered.to_keyspace(); + let filter_duration = start.elapsed(); - anyhow::Ok(( - filter_duration, - filtered.ranges.into_iter().map(move |r| KeyRange { - timeline, - timeline_lsn: lsn, - start: r.start.to_i128(), - end: r.end.to_i128(), - }), - )) - } - }); - } - let mut total_filter_duration = Duration::from_secs(0); - let mut all_ranges: Vec = Vec::new(); - while let Some(res) = js.join_next().await { - let (filter_duration, range) = res.unwrap().unwrap(); - all_ranges.extend(range); - total_filter_duration += filter_duration; - } - info!("filter duration: {}", total_filter_duration.as_secs_f64()); + anyhow::Ok(( + filter_duration, + filtered.ranges.into_iter().map(move |r| KeyRange { + timeline, + timeline_lsn: lsn, + start: r.start.to_i128(), + end: r.end.to_i128(), + }), + )) + } + }); + } + let mut total_filter_duration = Duration::from_secs(0); + let mut all_ranges: Vec = Vec::new(); + while let Some(res) = js.join_next().await { + let (filter_duration, range) = res.unwrap().unwrap(); + all_ranges.extend(range); + total_filter_duration += filter_duration; + } + info!("filter duration: {}", total_filter_duration.as_secs_f64()); + if let Some(cachefile) = args.keyspace_cache.as_ref() { + let cache = KeyspaceCacheSer { + tag: &timelines, + data: &all_ranges, + }; + let bytes = serde_json::to_vec(&cache).context("serialize keyspace for cache file")?; + std::fs::write(cachefile, bytes).context("write keyspace cache file to disk")?; + info!("successfully wrote keyspace cache file"); + } + all_ranges + }; let live_stats = Arc::new(LiveStats::default());