mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
pagebench: actually implement --num_clients (#6640)
Will need this to validate per-tenant throttling in https://github.com/neondatabase/neon/issues/5899
This commit is contained in:
committed by
GitHub
parent
947165788d
commit
e196d974cc
@@ -79,6 +79,12 @@ impl KeyRange {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Copy, Clone)]
|
||||
struct WorkerId {
|
||||
timeline: TenantTimelineId,
|
||||
num_client: usize, // from 0..args.num_clients
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: request_stats::Output,
|
||||
@@ -206,7 +212,7 @@ async fn main_impl(
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_client_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
let num_main_impl = 1;
|
||||
@@ -235,19 +241,25 @@ async fn main_impl(
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut work_senders: HashMap<TenantTimelineId, _> = HashMap::new();
|
||||
let mut work_senders: HashMap<WorkerId, _> = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(*tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&live_stats),
|
||||
cancel.clone(),
|
||||
)));
|
||||
for timeline in timelines.iter().cloned() {
|
||||
for num_client in 0..args.num_clients.get() {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
let worker_id = WorkerId {
|
||||
timeline,
|
||||
num_client,
|
||||
};
|
||||
work_senders.insert(worker_id, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
worker_id,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&live_stats),
|
||||
cancel.clone(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
|
||||
@@ -271,7 +283,10 @@ async fn main_impl(
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(
|
||||
r.timeline,
|
||||
WorkerId {
|
||||
timeline: r.timeline,
|
||||
num_client: rng.gen_range(0..args.num_clients.get()),
|
||||
},
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
@@ -289,56 +304,54 @@ async fn main_impl(
|
||||
}),
|
||||
Some(rps_limit) => Box::pin(async move {
|
||||
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
|
||||
let make_timeline_task: &dyn Fn(
|
||||
TenantTimelineId,
|
||||
)
|
||||
-> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
let make_task: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
|
||||
&|worker_id| {
|
||||
let sender = work_senders.get(&worker_id).unwrap();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let mut ticker = tokio::time::interval(period);
|
||||
ticker.set_missed_tick_behavior(
|
||||
/* TODO review this choice */
|
||||
tokio::time::MissedTickBehavior::Burst,
|
||||
);
|
||||
while !cancel.is_cancelled() {
|
||||
ticker.tick().await;
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let mut ticker = tokio::time::interval(period);
|
||||
ticker.set_missed_tick_behavior(
|
||||
/* TODO review this choice */
|
||||
tokio::time::MissedTickBehavior::Burst,
|
||||
);
|
||||
while !cancel.is_cancelled() {
|
||||
ticker.tick().await;
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
if sender.send(req).await.is_err() {
|
||||
assert!(
|
||||
cancel.is_cancelled(),
|
||||
"client has gone away unexpectedly"
|
||||
);
|
||||
}
|
||||
};
|
||||
if sender.send(req).await.is_err() {
|
||||
assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
})
|
||||
};
|
||||
|
||||
let tasks: Vec<_> = work_senders
|
||||
.keys()
|
||||
.map(|tl| make_timeline_task(*tl))
|
||||
.collect();
|
||||
let tasks: Vec<_> = work_senders.keys().map(|tl| make_task(*tl)).collect();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
@@ -390,12 +403,16 @@ async fn main_impl(
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
id: WorkerId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
let WorkerId {
|
||||
timeline,
|
||||
num_client: _,
|
||||
} = id;
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user