mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
WIP: failed attempt to have fixed number of clients going over all the key ranges of all tenants
The problem is that the connections are stateful, need to implement a client pool => sucks
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2938,6 +2938,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utils",
|
||||
]
|
||||
|
||||
@@ -16,6 +16,7 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
pageserver = { path = ".." }
|
||||
utils = { path = "../../libs/utils/" }
|
||||
|
||||
@@ -1,19 +1,33 @@
|
||||
mod tenant_timeline_id;
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver::client::mgmt_api::Client;
|
||||
use pageserver::client::page_service::RelTagBlockNo;
|
||||
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
|
||||
use pageserver::repository;
|
||||
use pageserver::{repository, tenant};
|
||||
use std::sync::Weak;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use pageserver::tenant::Tenant;
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tracing::info;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::logging;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -21,13 +35,9 @@ pub(crate) struct Args {
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
num_tasks: usize,
|
||||
#[clap(long)]
|
||||
num_requests: usize,
|
||||
#[clap(long)]
|
||||
pick_n_tenants: Option<usize>,
|
||||
tenants: Option<Vec<TenantId>>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
// targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -161,53 +171,172 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
struct ClientPool {
|
||||
cache: HashMap<TenantTimelineId, Vec<Weak<Mutex<pageserver::client::page_service::Client>>>>,
|
||||
lru: VecDeque<Arc<Mutex<pageserver::client::page_service::Client>>>,
|
||||
}
|
||||
|
||||
impl ClientPool {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
cache: Default::default(),
|
||||
lru: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn take(
|
||||
&mut self,
|
||||
timeline: TenantTimelineId,
|
||||
) -> Option<Arc<Mutex<pageserver::client::page_service::Client>>> {
|
||||
match self.cache.entry(timeline) {
|
||||
Entry::Occupied(mut o) => {
|
||||
while let Some(weak) = o.get_mut().pop() {
|
||||
if let Some(strong) = Weak::upgrade(&weak) {
|
||||
return Some(strong);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
Entry::Vacant(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put(
|
||||
&mut self,
|
||||
timeline: TenantTimelineId,
|
||||
client: Arc<Mutex<pageserver::client::page_service::Client>>,
|
||||
) {
|
||||
match self.cache.entry(timeline) {
|
||||
Entry::Occupied(mut o) => o.get_mut().push(Arc::downgrade(&client)),
|
||||
Entry::Vacant(v) => todo!(),
|
||||
}
|
||||
self.lru.push_front(client);
|
||||
self.lru.truncate(1000);
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange {
|
||||
timeline: TenantTimelineId,
|
||||
timeline_lsn: Lsn,
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
struct Targets {
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: Vec<usize>,
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let client = Arc::new(pageserver::client::mgmt_api::Client::new(
|
||||
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
));
|
||||
|
||||
let mut tenants: Vec<TenantId> = if let Some(tenants) = &args.tenants {
|
||||
tenants.clone()
|
||||
// discover targets
|
||||
let mut targets: Vec<TenantTimelineId> = Vec::new();
|
||||
if false {
|
||||
targets = targets.clone();
|
||||
} else {
|
||||
client
|
||||
let tenants: Vec<TenantId> = mgmt_api_client
|
||||
.list_tenants()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| ti.id)
|
||||
.collect()
|
||||
};
|
||||
let tenants = if let Some(n) = args.pick_n_tenants {
|
||||
tenants.truncate(n);
|
||||
if tenants.len() != n {
|
||||
anyhow::bail!("too few tenants: {} < {}", tenants.len(), n);
|
||||
.collect();
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, timelines) = res.unwrap();
|
||||
for tl in timelines {
|
||||
targets.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: tl.timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
tenants
|
||||
} else {
|
||||
tenants
|
||||
};
|
||||
|
||||
let mut tenant_timelines = Vec::new();
|
||||
for tenant_id in tenants {
|
||||
tenant_timelines.extend(
|
||||
client
|
||||
.list_timelines(tenant_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| (tenant_id, ti.timeline_id)),
|
||||
);
|
||||
}
|
||||
info!("tenant_timelines:\n{:?}", tenant_timelines);
|
||||
|
||||
info!("targets:\n{:?}", targets);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for target in targets {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
let partitioning = mgmt_api_client
|
||||
.keyspace(target.tenant_id, target.timeline_id)
|
||||
.await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(start), is_rel_block_key(end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
timeline: target,
|
||||
timeline_lsn: lsn,
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
anyhow::Ok(ranges)
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_ranges: Vec<KeyRange> = Vec::new();
|
||||
while let Some(ranges) = js.join_next().await {
|
||||
all_ranges.extend(ranges.unwrap().unwrap());
|
||||
}
|
||||
let mut all_weights: Vec<usize> = all_ranges
|
||||
.iter()
|
||||
.map(|v| (v.end - v.start).try_into().unwrap())
|
||||
.collect();
|
||||
let targets = Arc::new(Targets {
|
||||
ranges: all_ranges,
|
||||
weights: all_weights,
|
||||
});
|
||||
|
||||
let stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_work_tasks = tenant_timelines.len() * args.num_tasks;
|
||||
let num_work_tasks = args.num_clients.get();
|
||||
let num_live_stats_dump = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1));
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_work_tasks + num_live_stats_dump,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
@@ -228,23 +357,29 @@ async fn main_impl(
|
||||
}
|
||||
});
|
||||
|
||||
let pool = Arc::new(Mutex::new(ClientPool::new()));
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (tenant_id, timeline_id) in tenant_timelines {
|
||||
for client_id in 0..args.num_clients.get() {
|
||||
let live_stats = Arc::clone(&stats);
|
||||
let t = tokio::spawn(timeline(
|
||||
let t = tokio::spawn(client(
|
||||
args,
|
||||
client.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
client_id,
|
||||
Arc::clone(&mgmt_api_client),
|
||||
Arc::clone(&pool),
|
||||
Arc::clone(&start_work_barrier),
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&targets),
|
||||
live_stats,
|
||||
cancel.clone(),
|
||||
));
|
||||
tasks.push(((tenant_id, timeline_id), t));
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
for (_, t) in tasks {
|
||||
t.await.unwrap().unwrap();
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
@@ -264,111 +399,131 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn timeline(
|
||||
#[instrument(skip_all, %client_id)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
client_id: usize,
|
||||
mgmt_api_client: Arc<pageserver::client::mgmt_api::Client>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pool: Arc<Mutex<ClientPool>>,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
targets: Arc<Targets>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) -> anyhow::Result<()> {
|
||||
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
struct KeyRange {
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
while !cancel.is_cancelled() {
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = targets
|
||||
.ranges
|
||||
.choose_weighted(&mut rng, |range| range.len())
|
||||
.unwrap();
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(start), is_rel_block_key(end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// weighted ranges
|
||||
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
|
||||
|
||||
let ranges = Arc::new(ranges);
|
||||
let weights = Arc::new(weights);
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for _i in 0..args.num_tasks {
|
||||
let ranges = ranges.clone();
|
||||
let _weights = weights.clone();
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
let all_work_done_barrier = Arc::clone(&all_work_done_barrier);
|
||||
|
||||
let jh = tokio::spawn({
|
||||
let live_stats = Arc::clone(&live_stats);
|
||||
async move {
|
||||
let mut getpage_client = pageserver::client::page_service::Client::new(
|
||||
let client = match pool.lock().unwrap().take(range.timeline) {
|
||||
Some(client) => client,
|
||||
None => Arc::new(Mutex::new(
|
||||
pageserver::client::page_service::Client::new(
|
||||
args.page_service_connstring.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
range.timeline.tenant_id,
|
||||
range.timeline.timeline_id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.unwrap(),
|
||||
)),
|
||||
};
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
for _i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
RelTagBlockNo { rel_tag, block_no }
|
||||
};
|
||||
let start = Instant::now();
|
||||
getpage_client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
})
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
all_work_done_barrier.wait().await;
|
||||
|
||||
getpage_client.shutdown().await;
|
||||
}
|
||||
let start = Instant::now();
|
||||
client
|
||||
.lock()
|
||||
.unwrap()
|
||||
.getpage(key, range.timeline_lsn)
|
||||
.await
|
||||
.with_context(|| format!("getpage for {}", range.timeline))
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
tasks.push(jh);
|
||||
|
||||
pool.lock().unwrap().put(range.timeline, client);
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
|
||||
// async fn timeline(
|
||||
// args: &'static Args,
|
||||
// mgmt_api_client: Arc<pageserver::client::mgmt_api::Client>,
|
||||
// tenant_id: TenantId,
|
||||
// timeline_id: TimelineId,
|
||||
// start_work_barrier: Arc<Barrier>,
|
||||
// all_work_done_barrier: Arc<Barrier>,
|
||||
// live_stats: Arc<LiveStats>,
|
||||
// ) -> anyhow::Result<()> {
|
||||
// let mut tasks = Vec::new();
|
||||
|
||||
// for _i in 0..args.num_tasks {
|
||||
// let ranges = ranges.clone();
|
||||
// let _weights = weights.clone();
|
||||
// let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
// let all_work_done_barrier = Arc::clone(&all_work_done_barrier);
|
||||
|
||||
// let jh = tokio::spawn({
|
||||
// let live_stats = Arc::clone(&live_stats);
|
||||
// async move {
|
||||
// let mut getpage_client = pageserver::client::page_service::Client::new(
|
||||
// args.page_service_connstring.clone(),
|
||||
// tenant_id,
|
||||
// timeline_id,
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
|
||||
// start_work_barrier.wait().await;
|
||||
// for _i in 0..args.num_requests {
|
||||
// let key = {
|
||||
// let mut rng = rand::thread_rng();
|
||||
// let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
// let key: i128 = rng.gen_range(r.start..r.end);
|
||||
// let key = repository::Key::from_i128(key);
|
||||
// let (rel_tag, block_no) =
|
||||
// key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
// RelTagBlockNo { rel_tag, block_no }
|
||||
// };
|
||||
// let start = Instant::now();
|
||||
// getpage_client
|
||||
// .getpage(key, lsn)
|
||||
// .await
|
||||
// .with_context(|| {
|
||||
// format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
// })
|
||||
// .unwrap();
|
||||
// let elapsed = start.elapsed();
|
||||
// live_stats.inc();
|
||||
// STATS.with(|stats| {
|
||||
// stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
// });
|
||||
// }
|
||||
// all_work_done_barrier.wait().await;
|
||||
|
||||
// getpage_client.shutdown().await;
|
||||
// }
|
||||
// });
|
||||
// tasks.push(jh);
|
||||
// }
|
||||
|
||||
// for task in tasks {
|
||||
// task.await.unwrap();
|
||||
// }
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (tenant_id, timeline_id) = s
|
||||
.split_once("/")
|
||||
.context("tenant and timeline id must be separated by `/`")?;
|
||||
let tenant_id = TenantId::from_str(&tenant_id)
|
||||
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
|
||||
let timeline_id = TimelineId::from_str(&timeline_id)
|
||||
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user