From 59c8a29569caf4c0891ebd52d605dafdfa338f99 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 24 Nov 2023 17:12:42 +0000 Subject: [PATCH] WIP: failed attempt to have fixed number of clients going over all the key ranges of all tenants The problem is that the connections are stateful, need to implement a client pool => sucks --- Cargo.lock | 1 + pageserver/pagebench/Cargo.toml | 1 + .../pagebench/src/getpage_latest_lsn.rs | 429 ++++++++++++------ .../getpage_latest_lsn/tenant_timeline_id.rs | 36 ++ 4 files changed, 330 insertions(+), 137 deletions(-) create mode 100644 pageserver/pagebench/src/getpage_latest_lsn/tenant_timeline_id.rs diff --git a/Cargo.lock b/Cargo.lock index 07de6f0662..90f6d69b59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2938,6 +2938,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tracing", "utils", ] diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 15ec28d489..3cb3775d7b 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -16,6 +16,7 @@ serde.workspace = true serde_json.workspace = true tracing.workspace = true tokio.workspace = true +tokio-util.workspace = true pageserver = { path = ".." } utils = { path = "../../libs/utils/" } diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 3cbb0797ac..3746c2b972 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -1,19 +1,33 @@ +mod tenant_timeline_id; + use anyhow::Context; +use pageserver::client::mgmt_api::Client; use pageserver::client::page_service::RelTagBlockNo; use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; -use pageserver::repository; +use pageserver::{repository, tenant}; +use std::sync::Weak; +use tokio_util::sync::CancellationToken; +use utils::lsn::Lsn; +use pageserver::tenant::Tenant; use rand::prelude::*; use tokio::sync::Barrier; -use tracing::info; +use tokio::task::JoinSet; +use tracing::{info, instrument}; use utils::id::{TenantId, TimelineId}; use utils::logging; use std::cell::RefCell; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::num::NonZeroUsize; +use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use self::tenant_timeline_id::TenantTimelineId; + /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { @@ -21,13 +35,9 @@ pub(crate) struct Args { mgmt_api_endpoint: String, #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, - #[clap(long)] - num_tasks: usize, - #[clap(long)] - num_requests: usize, - #[clap(long)] - pick_n_tenants: Option, - tenants: Option>, + #[clap(long, default_value = "1")] + num_clients: NonZeroUsize, + // targets: Option>, } #[derive(Debug, Default)] @@ -161,53 +171,172 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { rt.block_on(main_task).unwrap() } +struct ClientPool { + cache: HashMap>>>, + lru: VecDeque>>, +} + +impl ClientPool { + pub fn new() -> Self { + Self { + cache: Default::default(), + lru: Default::default(), + } + } + + pub fn take( + &mut self, + timeline: TenantTimelineId, + ) -> Option>> { + match self.cache.entry(timeline) { + Entry::Occupied(mut o) => { + while let Some(weak) = o.get_mut().pop() { + if let Some(strong) = Weak::upgrade(&weak) { + return Some(strong); + } + } + None + } + Entry::Vacant(_) => None, + } + } + + pub fn put( + &mut self, + timeline: TenantTimelineId, + client: Arc>, + ) { + match self.cache.entry(timeline) { + Entry::Occupied(mut o) => o.get_mut().push(Arc::downgrade(&client)), + Entry::Vacant(v) => todo!(), + } + self.lru.push_front(client); + self.lru.truncate(1000); + } +} + +struct KeyRange { + timeline: TenantTimelineId, + timeline_lsn: Lsn, + start: i128, + end: i128, +} + +impl KeyRange { + fn len(&self) -> i128 { + self.end - self.start + } +} + +struct Targets { + ranges: Vec, + weights: Vec, +} + async fn main_impl( args: Args, thread_local_stats: Arc>>>>, ) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); - let client = Arc::new(pageserver::client::mgmt_api::Client::new( + let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), )); - let mut tenants: Vec = if let Some(tenants) = &args.tenants { - tenants.clone() + // discover targets + let mut targets: Vec = Vec::new(); + if false { + targets = targets.clone(); } else { - client + let tenants: Vec = mgmt_api_client .list_tenants() .await? .into_iter() .map(|ti| ti.id) - .collect() - }; - let tenants = if let Some(n) = args.pick_n_tenants { - tenants.truncate(n); - if tenants.len() != n { - anyhow::bail!("too few tenants: {} < {}", tenants.len(), n); + .collect(); + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + async move { + ( + tenant_id, + mgmt_api_client.list_timelines(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, timelines) = res.unwrap(); + for tl in timelines { + targets.push(TenantTimelineId { + tenant_id, + timeline_id: tl.timeline_id, + }); + } } - tenants - } else { - tenants - }; - - let mut tenant_timelines = Vec::new(); - for tenant_id in tenants { - tenant_timelines.extend( - client - .list_timelines(tenant_id) - .await? - .into_iter() - .map(|ti| (tenant_id, ti.timeline_id)), - ); } - info!("tenant_timelines:\n{:?}", tenant_timelines); + + info!("targets:\n{:?}", targets); + + let mut js = JoinSet::new(); + for target in targets { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + async move { + let partitioning = mgmt_api_client + .keyspace(target.tenant_id, target.timeline_id) + .await?; + let lsn = partitioning.at_lsn; + + let ranges = partitioning + .keys + .ranges + .iter() + .filter_map(|r| { + let start = r.start; + let end = r.end; + // filter out non-relblock keys + match (is_rel_block_key(start), is_rel_block_key(end)) { + (true, true) => Some(KeyRange { + timeline: target, + timeline_lsn: lsn, + start: start.to_i128(), + end: end.to_i128(), + }), + (true, false) | (false, true) => { + unimplemented!("split up range") + } + (false, false) => None, + } + }) + .collect::>(); + + anyhow::Ok(ranges) + } + }); + } + let mut all_ranges: Vec = Vec::new(); + while let Some(ranges) = js.join_next().await { + all_ranges.extend(ranges.unwrap().unwrap()); + } + let mut all_weights: Vec = all_ranges + .iter() + .map(|v| (v.end - v.start).try_into().unwrap()) + .collect(); + let targets = Arc::new(Targets { + ranges: all_ranges, + weights: all_weights, + }); let stats = Arc::new(LiveStats::default()); - let num_work_tasks = tenant_timelines.len() * args.num_tasks; + let num_work_tasks = args.num_clients.get(); + let num_live_stats_dump = 1; - let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1)); + let start_work_barrier = Arc::new(tokio::sync::Barrier::new( + num_work_tasks + num_live_stats_dump, + )); let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks)); tokio::spawn({ @@ -228,23 +357,29 @@ async fn main_impl( } }); + let pool = Arc::new(Mutex::new(ClientPool::new())); + + let cancel = CancellationToken::new(); + let mut tasks = Vec::new(); - for (tenant_id, timeline_id) in tenant_timelines { + for client_id in 0..args.num_clients.get() { let live_stats = Arc::clone(&stats); - let t = tokio::spawn(timeline( + let t = tokio::spawn(client( args, - client.clone(), - tenant_id, - timeline_id, + client_id, + Arc::clone(&mgmt_api_client), + Arc::clone(&pool), Arc::clone(&start_work_barrier), Arc::clone(&all_work_done_barrier), + Arc::clone(&targets), live_stats, + cancel.clone(), )); - tasks.push(((tenant_id, timeline_id), t)); + tasks.push(t); } - for (_, t) in tasks { - t.await.unwrap().unwrap(); + for t in tasks { + t.await.unwrap(); } let output = Output { @@ -264,111 +399,131 @@ async fn main_impl( anyhow::Ok(()) } -async fn timeline( +#[instrument(skip_all, %client_id)] +async fn client( args: &'static Args, + client_id: usize, mgmt_api_client: Arc, - tenant_id: TenantId, - timeline_id: TimelineId, + pool: Arc>, start_work_barrier: Arc, all_work_done_barrier: Arc, + targets: Arc, live_stats: Arc, -) -> anyhow::Result<()> { - let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; - let lsn = partitioning.at_lsn; + cancel: CancellationToken, +) { + start_work_barrier.wait().await; - struct KeyRange { - start: i128, - end: i128, - } + while !cancel.is_cancelled() { + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = targets + .ranges + .choose_weighted(&mut rng, |range| range.len()) + .unwrap(); + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = + key_to_rel_block(key).expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; - impl KeyRange { - fn len(&self) -> i128 { - self.end - self.start - } - } - - let ranges = partitioning - .keys - .ranges - .iter() - .filter_map(|r| { - let start = r.start; - let end = r.end; - // filter out non-relblock keys - match (is_rel_block_key(start), is_rel_block_key(end)) { - (true, true) => Some(KeyRange { - start: start.to_i128(), - end: end.to_i128(), - }), - (true, false) | (false, true) => { - unimplemented!("split up range") - } - (false, false) => None, - } - }) - .collect::>(); - - // weighted ranges - let weights = ranges.iter().map(|r| r.len()).collect::>(); - - let ranges = Arc::new(ranges); - let weights = Arc::new(weights); - - let mut tasks = Vec::new(); - - for _i in 0..args.num_tasks { - let ranges = ranges.clone(); - let _weights = weights.clone(); - let start_work_barrier = Arc::clone(&start_work_barrier); - let all_work_done_barrier = Arc::clone(&all_work_done_barrier); - - let jh = tokio::spawn({ - let live_stats = Arc::clone(&live_stats); - async move { - let mut getpage_client = pageserver::client::page_service::Client::new( + let client = match pool.lock().unwrap().take(range.timeline) { + Some(client) => client, + None => Arc::new(Mutex::new( + pageserver::client::page_service::Client::new( args.page_service_connstring.clone(), - tenant_id, - timeline_id, + range.timeline.tenant_id, + range.timeline.timeline_id, ) .await - .unwrap(); + .unwrap(), + )), + }; - start_work_barrier.wait().await; - for _i in 0..args.num_requests { - let key = { - let mut rng = rand::thread_rng(); - let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = - key_to_rel_block(key).expect("we filter non-rel-block keys out above"); - RelTagBlockNo { rel_tag, block_no } - }; - let start = Instant::now(); - getpage_client - .getpage(key, lsn) - .await - .with_context(|| { - format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) - }) - .unwrap(); - let elapsed = start.elapsed(); - live_stats.inc(); - STATS.with(|stats| { - stats.borrow().lock().unwrap().observe(elapsed).unwrap(); - }); - } - all_work_done_barrier.wait().await; - - getpage_client.shutdown().await; - } + let start = Instant::now(); + client + .lock() + .unwrap() + .getpage(key, range.timeline_lsn) + .await + .with_context(|| format!("getpage for {}", range.timeline)) + .unwrap(); + let elapsed = start.elapsed(); + live_stats.inc(); + STATS.with(|stats| { + stats.borrow().lock().unwrap().observe(elapsed).unwrap(); }); - tasks.push(jh); + + pool.lock().unwrap().put(range.timeline, client); } - for task in tasks { - task.await.unwrap(); - } - - Ok(()) + all_work_done_barrier.wait().await; } + +// async fn timeline( +// args: &'static Args, +// mgmt_api_client: Arc, +// tenant_id: TenantId, +// timeline_id: TimelineId, +// start_work_barrier: Arc, +// all_work_done_barrier: Arc, +// live_stats: Arc, +// ) -> anyhow::Result<()> { +// let mut tasks = Vec::new(); + +// for _i in 0..args.num_tasks { +// let ranges = ranges.clone(); +// let _weights = weights.clone(); +// let start_work_barrier = Arc::clone(&start_work_barrier); +// let all_work_done_barrier = Arc::clone(&all_work_done_barrier); + +// let jh = tokio::spawn({ +// let live_stats = Arc::clone(&live_stats); +// async move { +// let mut getpage_client = pageserver::client::page_service::Client::new( +// args.page_service_connstring.clone(), +// tenant_id, +// timeline_id, +// ) +// .await +// .unwrap(); + +// start_work_barrier.wait().await; +// for _i in 0..args.num_requests { +// let key = { +// let mut rng = rand::thread_rng(); +// let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); +// let key: i128 = rng.gen_range(r.start..r.end); +// let key = repository::Key::from_i128(key); +// let (rel_tag, block_no) = +// key_to_rel_block(key).expect("we filter non-rel-block keys out above"); +// RelTagBlockNo { rel_tag, block_no } +// }; +// let start = Instant::now(); +// getpage_client +// .getpage(key, lsn) +// .await +// .with_context(|| { +// format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) +// }) +// .unwrap(); +// let elapsed = start.elapsed(); +// live_stats.inc(); +// STATS.with(|stats| { +// stats.borrow().lock().unwrap().observe(elapsed).unwrap(); +// }); +// } +// all_work_done_barrier.wait().await; + +// getpage_client.shutdown().await; +// } +// }); +// tasks.push(jh); +// } + +// for task in tasks { +// task.await.unwrap(); +// } + +// Ok(()) +// } diff --git a/pageserver/pagebench/src/getpage_latest_lsn/tenant_timeline_id.rs b/pageserver/pagebench/src/getpage_latest_lsn/tenant_timeline_id.rs new file mode 100644 index 0000000000..6c751e24ae --- /dev/null +++ b/pageserver/pagebench/src/getpage_latest_lsn/tenant_timeline_id.rs @@ -0,0 +1,36 @@ +use std::str::FromStr; + +use anyhow::Context; +use utils::id::TimelineId; + +use utils::id::TenantId; + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub(crate) struct TenantTimelineId { + pub(crate) tenant_id: TenantId, + pub(crate) timeline_id: TimelineId, +} + +impl FromStr for TenantTimelineId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let (tenant_id, timeline_id) = s + .split_once("/") + .context("tenant and timeline id must be separated by `/`")?; + let tenant_id = TenantId::from_str(&tenant_id) + .with_context(|| format!("invalid tenant id: {tenant_id:?}"))?; + let timeline_id = TimelineId::from_str(&timeline_id) + .with_context(|| format!("invalid timeline id: {timeline_id:?}"))?; + Ok(Self { + tenant_id, + timeline_id, + }) + } +} + +impl std::fmt::Display for TenantTimelineId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.tenant_id, self.timeline_id) + } +}