remove unnucessary return impl Future

This commit is contained in:
Christian Schwarz
2023-11-24 10:56:52 +00:00
parent 4f1197311e
commit 0bd5e3aedc

View File

@@ -11,8 +11,6 @@ use tracing::info;
use utils::id::{TenantId, TimelineId};
use utils::logging;
use std::future::Future;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -138,102 +136,100 @@ async fn main() -> anyhow::Result<()> {
anyhow::Ok(())
}
fn timeline(
async fn timeline(
args: &'static Args,
mgmt_api_client: Arc<pageserver::client::mgmt_api::Client>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_work_barrier: Arc<Barrier>,
stats: Arc<LiveStats>,
) -> impl Future<Output = anyhow::Result<()>> + Send + Sync {
async move {
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
let lsn = partitioning.at_lsn;
) -> anyhow::Result<()> {
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
let lsn = partitioning.at_lsn;
struct KeyRange {
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
let ranges = partitioning
.keys
.ranges
.iter()
.filter_map(|r| {
let start = r.start;
let end = r.end;
// filter out non-relblock keys
match (is_rel_block_key(start), is_rel_block_key(end)) {
(true, true) => Some(KeyRange {
start: start.to_i128(),
end: end.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
for _i in 0..args.num_tasks {
let ranges = ranges.clone();
let _weights = weights.clone();
let start_work_barrier = Arc::clone(&start_work_barrier);
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut getpage_client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
tenant_id,
timeline_id,
)
.await
.unwrap();
start_work_barrier.wait().await;
for _i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
RelTagBlockNo { rel_tag, block_no }
};
getpage_client
.getpage(key, lsn)
.await
.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
})
.unwrap();
stats.inc();
}
getpage_client.shutdown().await;
}
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap();
}
Ok(())
struct KeyRange {
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
let ranges = partitioning
.keys
.ranges
.iter()
.filter_map(|r| {
let start = r.start;
let end = r.end;
// filter out non-relblock keys
match (is_rel_block_key(start), is_rel_block_key(end)) {
(true, true) => Some(KeyRange {
start: start.to_i128(),
end: end.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
for _i in 0..args.num_tasks {
let ranges = ranges.clone();
let _weights = weights.clone();
let start_work_barrier = Arc::clone(&start_work_barrier);
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut getpage_client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
tenant_id,
timeline_id,
)
.await
.unwrap();
start_work_barrier.wait().await;
for _i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
RelTagBlockNo { rel_tag, block_no }
};
getpage_client
.getpage(key, lsn)
.await
.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
})
.unwrap();
stats.inc();
}
getpage_client.shutdown().await;
}
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap();
}
Ok(())
}