a mode where one task picks which work to do & dispatches it to per-timeline clients

This commit is contained in:
Christian Schwarz
2023-11-24 18:00:17 +00:00
parent 59c8a29569
commit 687678c4ff
2 changed files with 84 additions and 211 deletions

View File

@@ -1,27 +1,21 @@
mod tenant_timeline_id;
use anyhow::Context;
use pageserver::client::mgmt_api::Client;
use pageserver::client::page_service::RelTagBlockNo;
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::{repository, tenant};
use std::sync::Weak;
use tokio_util::sync::CancellationToken;
use pageserver::repository;
use utils::lsn::Lsn;
use pageserver::tenant::Tenant;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{info, instrument};
use utils::id::{TenantId, TimelineId};
use utils::id::TenantId;
use utils::logging;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -37,7 +31,9 @@ pub(crate) struct Args {
page_service_connstring: String,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
// targets: Option<Vec<TenantTimelineId>>,
#[clap(long)]
runtime: Option<humantime::Duration>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
@@ -171,50 +167,6 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
rt.block_on(main_task).unwrap()
}
struct ClientPool {
cache: HashMap<TenantTimelineId, Vec<Weak<Mutex<pageserver::client::page_service::Client>>>>,
lru: VecDeque<Arc<Mutex<pageserver::client::page_service::Client>>>,
}
impl ClientPool {
pub fn new() -> Self {
Self {
cache: Default::default(),
lru: Default::default(),
}
}
pub fn take(
&mut self,
timeline: TenantTimelineId,
) -> Option<Arc<Mutex<pageserver::client::page_service::Client>>> {
match self.cache.entry(timeline) {
Entry::Occupied(mut o) => {
while let Some(weak) = o.get_mut().pop() {
if let Some(strong) = Weak::upgrade(&weak) {
return Some(strong);
}
}
None
}
Entry::Vacant(_) => None,
}
}
pub fn put(
&mut self,
timeline: TenantTimelineId,
client: Arc<Mutex<pageserver::client::page_service::Client>>,
) {
match self.cache.entry(timeline) {
Entry::Occupied(mut o) => o.get_mut().push(Arc::downgrade(&client)),
Entry::Vacant(v) => todo!(),
}
self.lru.push_front(client);
self.lru.truncate(1000);
}
}
struct KeyRange {
timeline: TenantTimelineId,
timeline_lsn: Lsn,
@@ -228,11 +180,6 @@ impl KeyRange {
}
}
struct Targets {
ranges: Vec<KeyRange>,
weights: Vec<usize>,
}
async fn main_impl(
args: Args,
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
@@ -244,9 +191,9 @@ async fn main_impl(
));
// discover targets
let mut targets: Vec<TenantTimelineId> = Vec::new();
let mut timelines: Vec<TenantTimelineId> = Vec::new();
if false {
targets = targets.clone();
timelines = timelines.clone();
} else {
let tenants: Vec<TenantId> = mgmt_api_client
.list_tenants()
@@ -267,9 +214,9 @@ async fn main_impl(
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, timelines) = res.unwrap();
for tl in timelines {
targets.push(TenantTimelineId {
let (tenant_id, tl_infos) = res.unwrap();
for tl in tl_infos {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id: tl.timeline_id,
});
@@ -277,15 +224,16 @@ async fn main_impl(
}
}
info!("targets:\n{:?}", targets);
info!("timelines:\n{:?}", timelines);
let mut js = JoinSet::new();
for target in targets {
for timeline in &timelines {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
let timeline = *timeline;
async move {
let partitioning = mgmt_api_client
.keyspace(target.tenant_id, target.timeline_id)
.keyspace(timeline.tenant_id, timeline.timeline_id)
.await?;
let lsn = partitioning.at_lsn;
@@ -299,7 +247,7 @@ async fn main_impl(
// filter out non-relblock keys
match (is_rel_block_key(start), is_rel_block_key(end)) {
(true, true) => Some(KeyRange {
timeline: target,
timeline,
timeline_lsn: lsn,
start: start.to_i128(),
end: end.to_i128(),
@@ -317,30 +265,26 @@ async fn main_impl(
});
}
let mut all_ranges: Vec<KeyRange> = Vec::new();
while let Some(ranges) = js.join_next().await {
all_ranges.extend(ranges.unwrap().unwrap());
while let Some(res) = js.join_next().await {
all_ranges.extend(res.unwrap().unwrap());
}
let mut all_weights: Vec<usize> = all_ranges
.iter()
.map(|v| (v.end - v.start).try_into().unwrap())
.collect();
let targets = Arc::new(Targets {
ranges: all_ranges,
weights: all_weights,
});
let weights =
rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len()))
.unwrap();
let stats = Arc::new(LiveStats::default());
let live_stats = Arc::new(LiveStats::default());
let num_work_tasks = args.num_clients.get();
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_work_tasks + num_live_stats_dump,
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&stats);
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
@@ -357,25 +301,49 @@ async fn main_impl(
}
});
let pool = Arc::new(Mutex::new(ClientPool::new()));
let cancel = CancellationToken::new();
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for client_id in 0..args.num_clients.get() {
let live_stats = Arc::clone(&stats);
let t = tokio::spawn(client(
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
client_id,
Arc::clone(&mgmt_api_client),
Arc::clone(&pool),
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&targets),
live_stats,
cancel.clone(),
));
tasks.push(t);
Arc::clone(&live_stats),
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
@@ -399,131 +367,38 @@ async fn main_impl(
anyhow::Ok(())
}
#[instrument(skip_all, %client_id)]
#[instrument(skip_all)]
async fn client(
args: &'static Args,
client_id: usize,
mgmt_api_client: Arc<pageserver::client::mgmt_api::Client>,
pool: Arc<Mutex<ClientPool>>,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
all_work_done_barrier: Arc<Barrier>,
targets: Arc<Targets>,
live_stats: Arc<LiveStats>,
cancel: CancellationToken,
) {
start_work_barrier.wait().await;
while !cancel.is_cancelled() {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = targets
.ranges
.choose_weighted(&mut rng, |range| range.len())
.unwrap();
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let client = match pool.lock().unwrap().take(range.timeline) {
Some(client) => client,
None => Arc::new(Mutex::new(
pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
range.timeline.tenant_id,
range.timeline.timeline_id,
)
.await
.unwrap(),
)),
};
let mut client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
timeline.tenant_id,
timeline.timeline_id,
)
.await
.unwrap();
while let Some((key, lsn)) = work.recv().await {
let start = Instant::now();
client
.lock()
.unwrap()
.getpage(key, range.timeline_lsn)
.getpage(key, lsn)
.await
.with_context(|| format!("getpage for {}", range.timeline))
.with_context(|| format!("getpage for {timeline}"))
.unwrap();
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
pool.lock().unwrap().put(range.timeline, client);
}
all_work_done_barrier.wait().await;
}
// async fn timeline(
// args: &'static Args,
// mgmt_api_client: Arc<pageserver::client::mgmt_api::Client>,
// tenant_id: TenantId,
// timeline_id: TimelineId,
// start_work_barrier: Arc<Barrier>,
// all_work_done_barrier: Arc<Barrier>,
// live_stats: Arc<LiveStats>,
// ) -> anyhow::Result<()> {
// let mut tasks = Vec::new();
// for _i in 0..args.num_tasks {
// let ranges = ranges.clone();
// let _weights = weights.clone();
// let start_work_barrier = Arc::clone(&start_work_barrier);
// let all_work_done_barrier = Arc::clone(&all_work_done_barrier);
// let jh = tokio::spawn({
// let live_stats = Arc::clone(&live_stats);
// async move {
// let mut getpage_client = pageserver::client::page_service::Client::new(
// args.page_service_connstring.clone(),
// tenant_id,
// timeline_id,
// )
// .await
// .unwrap();
// start_work_barrier.wait().await;
// for _i in 0..args.num_requests {
// let key = {
// let mut rng = rand::thread_rng();
// let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
// let key: i128 = rng.gen_range(r.start..r.end);
// let key = repository::Key::from_i128(key);
// let (rel_tag, block_no) =
// key_to_rel_block(key).expect("we filter non-rel-block keys out above");
// RelTagBlockNo { rel_tag, block_no }
// };
// let start = Instant::now();
// getpage_client
// .getpage(key, lsn)
// .await
// .with_context(|| {
// format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
// })
// .unwrap();
// let elapsed = start.elapsed();
// live_stats.inc();
// STATS.with(|stats| {
// stats.borrow().lock().unwrap().observe(elapsed).unwrap();
// });
// }
// all_work_done_barrier.wait().await;
// getpage_client.shutdown().await;
// }
// });
// tasks.push(jh);
// }
// for task in tasks {
// task.await.unwrap();
// }
// Ok(())
// }

View File

@@ -42,7 +42,7 @@ def test_getpage_throughput(
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ps_http.tenant_detach(template_tenant)
@@ -53,7 +53,7 @@ def test_getpage_throughput(
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
tenants = [template_tenant]
for i in range(0, 200):
for i in range(0, 1):
new_tenant = TenantId.generate()
tenants.append(new_tenant)
log.info("Duplicating tenant #%s: %s", i, new_tenant)
@@ -107,11 +107,9 @@ def test_getpage_throughput(
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-tasks",
"1",
"--num-requests",
"200000",
*[str(tenant) for tenant in tenants],
"--runtime",
"10s",
*[f"{tenant}/{template_timeline}" for tenant in tenants],
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd)