WIP: factor out http client

This commit is contained in:
Christian Schwarz
2023-11-09 17:44:32 +00:00
parent f3ae4eabdc
commit a2a6bfc85c
6 changed files with 198 additions and 148 deletions

View File

@@ -571,6 +571,7 @@ pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {

View File

@@ -210,6 +210,7 @@ fn ensure_logging_ready() {
utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});

View File

@@ -11,9 +11,11 @@ use pageserver_api::reltag::RelTag;
use rand::prelude::*;
use tokio::sync::Barrier;
use tracing::info;
use utils::id::{TenantId, TimelineId};
use utils::logging;
use std::future::Future;
use std::ops::Range;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@@ -62,7 +64,7 @@ struct Args {
num_requests: usize,
#[clap(long)]
pick_n_tenants: Option<usize>,
tenants: Option<Vec<String>>,
tenants: Option<Vec<TenantId>>,
}
#[derive(Debug, Default)]
@@ -77,7 +79,7 @@ impl LiveStats {
}
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
@@ -87,52 +89,37 @@ async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
let client = Client::new();
let client = Arc::new(mgmt_api_client::Client::new(args.mgmt_api_endpoint.into()));
let tenants = if let Some(tenants) = &args.tenants {
let mut tenants: Vec<TenantId> = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.mgmt_api_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
client
.list_tenants()
.await?
.into_iter()
.map(|ti| ti.id)
.collect()
};
let tenants = if let Some(n) = args.pick_n_tenants {
tenants.truncate(n);
if tenants.len() != n {
anyhow::bail!("too few tenants: {} < {}", tenants.len(), n);
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
tenants
} else {
tenants
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.mgmt_api_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
tenant_timelines.extend(
client
.list_timelines(tenant_id)
.await?
.into_iter()
.map(|ti| (tenant_id, ti.timeline_id)),
);
}
info!("tenant_timelines:\n{:?}", tenant_timelines);
@@ -177,48 +164,33 @@ async fn main() {
for t in tasks {
t.await.unwrap();
}
anyhow::Ok(())
}
fn timeline(
args: &'static Args,
http_client: Client<HttpConnector, hyper::Body>,
tenant_id: String,
timeline_id: String,
mgmt_api_client: Arc<mgmt_api_client::Client>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_work_barrier: Arc<Barrier>,
stats: Arc<LiveStats>,
) -> impl Future<Output = ()> + Send + Sync {
async move {
let resp = http_client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.mgmt_api_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
let ranges = keyspace["keys"]
.as_array()
.unwrap()
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
let lsn = partitioning.at_lsn;
let ranges = partitioning
.keys
.ranges
.iter()
.filter_map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
let start = r.start;
let end = r.end;
// filter out non-relblock keys
match (is_rel_block_key(start.0), is_rel_block_key(end.0)) {
match (is_rel_block_key(start), is_rel_block_key(end)) {
(true, true) => Some(KeyRange {
start: start.0.to_i128(),
end: end.0.to_i128(),
start: start.to_i128(),
end: end.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
@@ -239,17 +211,14 @@ fn timeline(
for _i in 0..args.num_tasks {
let ranges = ranges.clone();
let _weights = weights.clone();
let _client = http_client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let start_work_barrier = Arc::clone(&start_work_barrier);
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut client = getpage_client::Client::new(
let mut getpage_client = getpage_client::Client::new(
args.page_service_connstring.clone(),
tenant_id.clone(),
timeline_id.clone(),
tenant_id,
timeline_id,
)
.await
.unwrap();
@@ -269,7 +238,7 @@ fn timeline(
key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client
getpage_client
.getpage(key, lsn)
.await
.with_context(|| {
@@ -278,7 +247,7 @@ fn timeline(
.unwrap();
stats.inc();
}
client.shutdown().await;
getpage_client.shutdown().await;
}
});
tasks.push(task);
@@ -290,6 +259,74 @@ fn timeline(
}
}
mod mgmt_api_client {
use bytes::{Buf, Bytes, BytesMut};
use hyper::{client::HttpConnector, Uri};
use utils::id::{TenantId, TimelineId};
pub(crate) struct Client {
mgmt_api_endpoint: String,
pub(crate) client: hyper::Client<HttpConnector, hyper::Body>,
}
impl Client {
pub fn new(mgmt_api_endpoint: Bytes) -> Self {
Self {
mgmt_api_endpoint,
client: hyper::client::Client::new(),
}
}
pub async fn list_tenants(
&self,
) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
let resp = self.client.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline",
self.mgmt_api_endpoint
))?;
let resp = self.client.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn keyspace(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<pageserver::http::models::Partitioning> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace",
self.mgmt_api_endpoint
))?;
let resp = self.client.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body))
// let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
// let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
}
}
}
mod getpage_client {
use std::pin::Pin;
@@ -301,7 +338,10 @@ mod getpage_client {
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::RelTagBlockNo;
@@ -314,8 +354,8 @@ mod getpage_client {
impl Client {
pub async fn new(
connstring: String,
tenant_id: String,
timeline_id: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<Self> {
let (client, connection) =
tokio_postgres::connect(&connstring, postgres::NoTls).await?;

View File

@@ -1,4 +1,6 @@
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;
// pub use pageserver_api::models;
pub mod models;

View File

@@ -0,0 +1,66 @@
//! If possible, use `::pageserver_api::models` instead.
use utils::lsn::Lsn;
pub struct Partitioning {
pub keys: crate::keyspace::KeySpace,
pub at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
pub struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}

View File

@@ -25,7 +25,7 @@ use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
@@ -60,7 +60,7 @@ use utils::{
};
// Imports only used for testing APIs
use super::models::ConfigureFailpointsRequest;
use pageserver_api::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
@@ -1412,69 +1412,6 @@ async fn timeline_collect_keyspace(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
struct Partitioning {
keys: crate::keyspace::KeySpace,
at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
async {
@@ -1486,7 +1423,10 @@ async fn timeline_collect_keyspace(
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
json_response(
StatusCode::OK,
crate::http::models::Partitioning { keys, at_lsn },
)
}
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
.await