From a2a6bfc85cb22bcebf185215784af67fc6257d0c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 9 Nov 2023 17:44:32 +0000 Subject: [PATCH] WIP: factor out http client --- libs/pageserver_api/src/models.rs | 1 + libs/remote_storage/tests/test_real_s3.rs | 1 + pageserver/src/bin/getpage_bench_libpq.rs | 202 +++++++++++++--------- pageserver/src/http/mod.rs | 4 +- pageserver/src/http/models.rs | 66 +++++++ pageserver/src/http/routes.rs | 72 +------- 6 files changed, 198 insertions(+), 148 deletions(-) create mode 100644 pageserver/src/http/models.rs diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 838e3b9c26..3029680504 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -571,6 +571,7 @@ pub struct TimelineGcRequest { pub gc_horizon: Option, } + // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index e2360b7533..48f00e0106 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -210,6 +210,7 @@ fn ensure_logging_ready() { utils::logging::init( utils::logging::LogFormat::Test, utils::logging::TracingErrorLayerEnablement::Disabled, + utils::logging::Output::Stdout, ) .expect("logging init failed"); }); diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs index e36de0c485..05281f0b86 100644 --- a/pageserver/src/bin/getpage_bench_libpq.rs +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -11,9 +11,11 @@ use pageserver_api::reltag::RelTag; use rand::prelude::*; use tokio::sync::Barrier; use tracing::info; +use utils::id::{TenantId, TimelineId}; use utils::logging; use std::future::Future; +use std::ops::Range; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -62,7 +64,7 @@ struct Args { num_requests: usize, #[clap(long)] pick_n_tenants: Option, - tenants: Option>, + tenants: Option>, } #[derive(Debug, Default)] @@ -77,7 +79,7 @@ impl LiveStats { } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { logging::init( logging::LogFormat::Plain, logging::TracingErrorLayerEnablement::Disabled, @@ -87,52 +89,37 @@ async fn main() { let args: &'static Args = Box::leak(Box::new(Args::parse())); - let client = Client::new(); + let client = Arc::new(mgmt_api_client::Client::new(args.mgmt_api_endpoint.into())); - let tenants = if let Some(tenants) = &args.tenants { + let mut tenants: Vec = if let Some(tenants) = &args.tenants { tenants.clone() } else { - let resp = client - .get(Uri::try_from(&format!("{}/v1/tenant", args.mgmt_api_endpoint)).unwrap()) - .await - .unwrap(); - - let body = hyper::body::to_bytes(resp).await.unwrap(); - let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap(); - let mut out = Vec::new(); - for t in tenants.as_array().unwrap() { - if let Some(limit) = args.pick_n_tenants { - if out.len() >= limit { - break; - } - } - out.push(t.get("id").unwrap().as_str().unwrap().to_owned()); + client + .list_tenants() + .await? + .into_iter() + .map(|ti| ti.id) + .collect() + }; + let tenants = if let Some(n) = args.pick_n_tenants { + tenants.truncate(n); + if tenants.len() != n { + anyhow::bail!("too few tenants: {} < {}", tenants.len(), n); } - if let Some(limit) = args.pick_n_tenants { - assert_eq!(out.len(), limit); - } - out + tenants + } else { + tenants }; let mut tenant_timelines = Vec::new(); for tenant_id in tenants { - let resp = client - .get( - Uri::try_from(&format!( - "{}/v1/tenant/{}/timeline", - args.mgmt_api_endpoint, tenant_id - )) - .unwrap(), - ) - .await - .unwrap(); - - let body = hyper::body::to_bytes(resp).await.unwrap(); - let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap(); - for t in timelines.as_array().unwrap() { - let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned(); - tenant_timelines.push((tenant_id.clone(), timeline_id)); - } + tenant_timelines.extend( + client + .list_timelines(tenant_id) + .await? + .into_iter() + .map(|ti| (tenant_id, ti.timeline_id)), + ); } info!("tenant_timelines:\n{:?}", tenant_timelines); @@ -177,48 +164,33 @@ async fn main() { for t in tasks { t.await.unwrap(); } + + anyhow::Ok(()) } fn timeline( args: &'static Args, - http_client: Client, - tenant_id: String, - timeline_id: String, + mgmt_api_client: Arc, + tenant_id: TenantId, + timeline_id: TimelineId, start_work_barrier: Arc, stats: Arc, ) -> impl Future + Send + Sync { async move { - let resp = http_client - .get( - Uri::try_from(&format!( - "{}/v1/tenant/{}/timeline/{}/keyspace", - args.mgmt_api_endpoint, tenant_id, timeline_id - )) - .unwrap(), - ) - .await - .unwrap(); - if !resp.status().is_success() { - panic!("Failed to get keyspace: {resp:?}"); - } - let body = hyper::body::to_bytes(resp).await.unwrap(); - let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap(); - let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap(); - - let ranges = keyspace["keys"] - .as_array() - .unwrap() + let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; + let lsn = partitioning.at_lsn; + let ranges = partitioning + .keys + .ranges .iter() .filter_map(|r| { - let r = r.as_array().unwrap(); - assert_eq!(r.len(), 2); - let start = Key::from_str(r[0].as_str().unwrap()).unwrap(); - let end = Key::from_str(r[1].as_str().unwrap()).unwrap(); + let start = r.start; + let end = r.end; // filter out non-relblock keys - match (is_rel_block_key(start.0), is_rel_block_key(end.0)) { + match (is_rel_block_key(start), is_rel_block_key(end)) { (true, true) => Some(KeyRange { - start: start.0.to_i128(), - end: end.0.to_i128(), + start: start.to_i128(), + end: end.to_i128(), }), (true, false) | (false, true) => { unimplemented!("split up range") @@ -239,17 +211,14 @@ fn timeline( for _i in 0..args.num_tasks { let ranges = ranges.clone(); let _weights = weights.clone(); - let _client = http_client.clone(); - let tenant_id = tenant_id.clone(); - let timeline_id = timeline_id.clone(); let start_work_barrier = Arc::clone(&start_work_barrier); let task = tokio::spawn({ let stats = Arc::clone(&stats); async move { - let mut client = getpage_client::Client::new( + let mut getpage_client = getpage_client::Client::new( args.page_service_connstring.clone(), - tenant_id.clone(), - timeline_id.clone(), + tenant_id, + timeline_id, ) .await .unwrap(); @@ -269,7 +238,7 @@ fn timeline( key_to_rel_block(key).expect("we just checked"); RelTagBlockNo { rel_tag, block_no } }; - client + getpage_client .getpage(key, lsn) .await .with_context(|| { @@ -278,7 +247,7 @@ fn timeline( .unwrap(); stats.inc(); } - client.shutdown().await; + getpage_client.shutdown().await; } }); tasks.push(task); @@ -290,6 +259,74 @@ fn timeline( } } +mod mgmt_api_client { + use bytes::{Buf, Bytes, BytesMut}; + use hyper::{client::HttpConnector, Uri}; + use utils::id::{TenantId, TimelineId}; + + pub(crate) struct Client { + mgmt_api_endpoint: String, + pub(crate) client: hyper::Client, + } + + impl Client { + pub fn new(mgmt_api_endpoint: Bytes) -> Self { + Self { + mgmt_api_endpoint, + client: hyper::client::Client::new(), + } + } + + pub async fn list_tenants( + &self, + ) -> anyhow::Result> { + let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn list_timelines( + &self, + tenant_id: TenantId, + ) -> anyhow::Result> { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn keyspace( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)) + + // let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap(); + // let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap(); + } + } +} + mod getpage_client { use std::pin::Pin; @@ -301,7 +338,10 @@ mod getpage_client { use tokio::task::JoinHandle; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; - use utils::lsn::Lsn; + use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, + }; use crate::RelTagBlockNo; @@ -314,8 +354,8 @@ mod getpage_client { impl Client { pub async fn new( connstring: String, - tenant_id: String, - timeline_id: String, + tenant_id: TenantId, + timeline_id: TimelineId, ) -> anyhow::Result { let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?; diff --git a/pageserver/src/http/mod.rs b/pageserver/src/http/mod.rs index 1c083bd382..5b0fa165cb 100644 --- a/pageserver/src/http/mod.rs +++ b/pageserver/src/http/mod.rs @@ -1,4 +1,6 @@ pub mod routes; pub use routes::make_router; -pub use pageserver_api::models; +// pub use pageserver_api::models; + +pub mod models; diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs new file mode 100644 index 0000000000..e146a66ef7 --- /dev/null +++ b/pageserver/src/http/models.rs @@ -0,0 +1,66 @@ +//! If possible, use `::pageserver_api::models` instead. + +use utils::lsn::Lsn; + +pub struct Partitioning { + pub keys: crate::keyspace::KeySpace, + + pub at_lsn: Lsn, +} + +impl serde::Serialize for Partitioning { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeMap; + let mut map = serializer.serialize_map(Some(2))?; + map.serialize_key("keys")?; + map.serialize_value(&KeySpace(&self.keys))?; + map.serialize_key("at_lsn")?; + map.serialize_value(&WithDisplay(&self.at_lsn))?; + map.end() + } +} + +pub struct WithDisplay<'a, T>(&'a T); + +impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.collect_str(&self.0) + } +} + +pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace); + +impl<'a> serde::Serialize for KeySpace<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeSeq; + let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?; + for kr in &self.0.ranges { + seq.serialize_element(&KeyRange(kr))?; + } + seq.end() + } +} + +pub struct KeyRange<'a>(&'a std::ops::Range); + +impl<'a> serde::Serialize for KeyRange<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeTuple; + let mut t = serializer.serialize_tuple(2)?; + t.serialize_element(&WithDisplay(&self.0.start))?; + t.serialize_element(&WithDisplay(&self.0.end))?; + t.end() + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index cf53031dfc..98e849fc1b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -25,7 +25,7 @@ use utils::http::endpoint::request_span; use utils::http::json::json_request_or_empty_body; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; -use super::models::{ +use pageserver_api::models::{ StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, TimelineGcRequest, TimelineInfo, }; @@ -60,7 +60,7 @@ use utils::{ }; // Imports only used for testing APIs -use super::models::ConfigureFailpointsRequest; +use pageserver_api::models::ConfigureFailpointsRequest; pub struct State { conf: &'static PageServerConf, @@ -1412,69 +1412,6 @@ async fn timeline_collect_keyspace( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_id))?; - struct Partitioning { - keys: crate::keyspace::KeySpace, - - at_lsn: Lsn, - } - - impl serde::Serialize for Partitioning { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeMap; - let mut map = serializer.serialize_map(Some(2))?; - map.serialize_key("keys")?; - map.serialize_value(&KeySpace(&self.keys))?; - map.serialize_key("at_lsn")?; - map.serialize_value(&WithDisplay(&self.at_lsn))?; - map.end() - } - } - - struct WithDisplay<'a, T>(&'a T); - - impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - serializer.collect_str(&self.0) - } - } - - struct KeySpace<'a>(&'a crate::keyspace::KeySpace); - - impl<'a> serde::Serialize for KeySpace<'a> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeSeq; - let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?; - for kr in &self.0.ranges { - seq.serialize_element(&KeyRange(kr))?; - } - seq.end() - } - } - - struct KeyRange<'a>(&'a std::ops::Range); - - impl<'a> serde::Serialize for KeyRange<'a> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::ser::SerializeTuple; - let mut t = serializer.serialize_tuple(2)?; - t.serialize_element(&WithDisplay(&self.0.start))?; - t.serialize_element(&WithDisplay(&self.0.end))?; - t.end() - } - } - let at_lsn: Option = parse_query_param(&request, "at_lsn")?; async { @@ -1486,7 +1423,10 @@ async fn timeline_collect_keyspace( .await .map_err(ApiError::InternalServerError)?; - json_response(StatusCode::OK, Partitioning { keys, at_lsn }) + json_response( + StatusCode::OK, + crate::http::models::Partitioning { keys, at_lsn }, + ) } .instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id)) .await