From 658c20bea4cf67a678a6792e181f160997701023 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 6 Dec 2023 17:20:04 +0000 Subject: [PATCH] jwt support; debug spans in basebackup --- libs/utils/src/lsn.rs | 41 +++++ pageserver/pagebench/src/basebackup.rs | 60 +++++--- .../pagebench/src/getpage_latest_lsn.rs | 1 + pageserver/pagebench/src/util.rs | 1 + pageserver/pagebench/src/util/connstring.rs | 8 + pageserver/src/basebackup.rs | 142 ++++++++++++------ pageserver/src/client/mgmt_api.rs | 36 ++++- 7 files changed, 216 insertions(+), 73 deletions(-) create mode 100644 pageserver/pagebench/src/util/connstring.rs diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 262dcb8a8a..616c1b1887 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -366,6 +366,47 @@ impl MonotonicCounter for RecordLsn { } } +/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s. +pub struct LsnSampler(::Sampler); + +impl rand::distributions::uniform::SampleUniform for Lsn { + type Sampler = LsnSampler; +} + +impl rand::distributions::uniform::UniformSampler for LsnSampler { + type X = Lsn; + + fn new(low: B1, high: B2) -> Self + where + B1: rand::distributions::uniform::SampleBorrow + Sized, + B2: rand::distributions::uniform::SampleBorrow + Sized, + { + Self( + ::Sampler::new( + low.borrow().0, + high.borrow().0, + ), + ) + } + + fn new_inclusive(low: B1, high: B2) -> Self + where + B1: rand::distributions::uniform::SampleBorrow + Sized, + B2: rand::distributions::uniform::SampleBorrow + Sized, + { + Self( + ::Sampler::new_inclusive( + low.borrow().0, + high.borrow().0, + ), + ) + } + + fn sample(&self, rng: &mut R) -> Self::X { + Lsn(self.0.sample(rng)) + } +} + #[cfg(test)] mod tests { use crate::bin_ser::BeSer; diff --git a/pageserver/pagebench/src/basebackup.rs b/pageserver/pagebench/src/basebackup.rs index 8ba7ba72bf..2beb1f450c 100644 --- a/pageserver/pagebench/src/basebackup.rs +++ b/pageserver/pagebench/src/basebackup.rs @@ -12,6 +12,7 @@ use utils::logging; use std::cell::RefCell; use std::collections::HashMap; use std::num::NonZeroUsize; +use std::ops::Range; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -23,8 +24,10 @@ use crate::util::tenant_timeline_id::TenantTimelineId; pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, - #[clap(long, default_value = "postgres://postgres@localhost:64000")] - page_service_connstring: String, + #[clap(long, default_value = "localhost:64000")] + page_service_host_port: String, + #[clap(long)] + pageserver_jwt: Option, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, #[clap(long, default_value = "1.0")] @@ -167,7 +170,7 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { struct Target { timeline: TenantTimelineId, - timeline_lsn: Lsn, + lsn_range: Option>, } async fn main_impl( @@ -178,6 +181,7 @@ async fn main_impl( let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), + args.pageserver_jwt.as_deref(), )); // discover targets @@ -219,17 +223,15 @@ async fn main_impl( let mut js = JoinSet::new(); for timeline in &timelines { js.spawn({ - let mgmt_api_client = Arc::clone(&mgmt_api_client); let timeline = *timeline; + let info = mgmt_api_client + .timeline_info(timeline.tenant_id, timeline.timeline_id) + .await + .unwrap(); async move { - let partitioning = mgmt_api_client - .keyspace(timeline.tenant_id, timeline.timeline_id) - .await?; - let timeline_lsn = partitioning.at_lsn; - anyhow::Ok(Target { timeline, - timeline_lsn, + lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)), }) } }); @@ -271,7 +273,7 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); for tl in &timelines { - let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are + let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); tasks.push(tokio::spawn(client( args, @@ -286,14 +288,21 @@ async fn main_impl( let work_sender = async move { start_work_barrier.wait().await; loop { - let (target, gzip) = { + let (timeline, work) = { let mut rng = rand::thread_rng(); let target = all_targets.choose(&mut rng).unwrap(); - (target, rng.gen_bool(args.gzip_probability)) + let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r)); + ( + target.timeline, + Work { + lsn, + gzip: rng.gen_bool(args.gzip_probability), + }, + ) }; - let sender = work_senders.get(&target.timeline).unwrap(); + let sender = work_senders.get(&timeline).unwrap(); // TODO: what if this blocks? - sender.send((target.timeline_lsn, gzip)).await.ok().unwrap(); + sender.send(work).await.ok().unwrap(); } }; @@ -330,29 +339,38 @@ async fn main_impl( anyhow::Ok(()) } +#[derive(Copy, Clone)] +struct Work { + lsn: Option, + gzip: bool, +} + #[instrument(skip_all)] async fn client( args: &'static Args, timeline: TenantTimelineId, start_work_barrier: Arc, - mut work: tokio::sync::mpsc::Receiver<(Lsn, bool)>, + mut work: tokio::sync::mpsc::Receiver, all_work_done_barrier: Arc, live_stats: Arc, ) { start_work_barrier.wait().await; let client = - pageserver::client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); + pageserver::client::page_service::Client::new(crate::util::connstring::connstring( + &args.page_service_host_port, + args.pageserver_jwt.as_deref(), + )) + .await + .unwrap(); - while let Some((lsn, gzip)) = work.recv().await { + while let Some(Work { lsn, gzip }) = work.recv().await { let start = Instant::now(); let copy_out_stream = client .basebackup(&BasebackupRequest { tenant_id: timeline.tenant_id, timeline_id: timeline.timeline_id, - lsn: Some(lsn), + lsn, gzip, }) .await diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 1fb66baabe..61867a13ae 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -186,6 +186,7 @@ async fn main_impl( let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), + None, // TODO: support jwt in args )); // discover targets diff --git a/pageserver/pagebench/src/util.rs b/pageserver/pagebench/src/util.rs index 479cad0532..bfbd10d489 100644 --- a/pageserver/pagebench/src/util.rs +++ b/pageserver/pagebench/src/util.rs @@ -1 +1,2 @@ pub(crate) mod tenant_timeline_id; +pub(crate) mod connstring; diff --git a/pageserver/pagebench/src/util/connstring.rs b/pageserver/pagebench/src/util/connstring.rs new file mode 100644 index 0000000000..7106a42965 --- /dev/null +++ b/pageserver/pagebench/src/util/connstring.rs @@ -0,0 +1,8 @@ +pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String { + let colon_and_jwt = if let Some(jwt) = jwt { + format!(":{jwt}") // TODO: urlescape + } else { + format!("") + }; + format!("postgres://postgres{colon_and_jwt}@{host_port}") +} diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index ed452eae7d..afcb738016 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -166,71 +166,111 @@ where } } - // Gather non-relational files from object storage pages. + debug!("Gather non-relational files from object storage pages"); for kind in [ SlruKind::Clog, SlruKind::MultiXactOffsets, SlruKind::MultiXactMembers, ] { - for segno in self - .timeline - .list_slru_segments(kind, self.lsn, self.ctx) - .await? - { - self.add_slru_segment(kind, segno).await?; + async { + debug!("list slru segments"); + for segno in self + .timeline + .list_slru_segments(kind, self.lsn, self.ctx) + .await? + { + async { + debug!("add slru segment"); + self.add_slru_segment(kind, segno).await?; + anyhow::Ok(()) + } + .instrument(debug_span!("slru segment", ?segno)) + .await?; + } + anyhow::Ok(()) } + .instrument(debug_span!("non-rel file", ?kind)) + .await?; } let mut min_restart_lsn: Lsn = Lsn::MAX; - // Create tablespace directories + debug!("Create tablespace directories"); for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn, self.ctx).await? { - self.add_dbdir(spcnode, dbnode, has_relmap_file).await?; + async { + debug!("iter"); + self.add_dbdir(spcnode, dbnode, has_relmap_file).await?; - // If full backup is requested, include all relation files. - // Otherwise only include init forks of unlogged relations. - let rels = self - .timeline - .list_rels(spcnode, dbnode, self.lsn, self.ctx) - .await?; - for &rel in rels.iter() { - // Send init fork as main fork to provide well formed empty - // contents of UNLOGGED relations. Postgres copies it in - // `reinit.c` during recovery. - if rel.forknum == INIT_FORKNUM { - // I doubt we need _init fork itself, but having it at least - // serves as a marker relation is unlogged. - self.add_rel(rel, rel).await?; - self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?; - continue; - } + // If full backup is requested, include all relation files. + // Otherwise only include init forks of unlogged relations. + debug!("list rels"); + let rels = self + .timeline + .list_rels(spcnode, dbnode, self.lsn, self.ctx) + .await?; + for &rel in rels.iter() { + async { + debug!("iter"); + // Send init fork as main fork to provide well formed empty + // contents of UNLOGGED relations. Postgres copies it in + // `reinit.c` during recovery. + if rel.forknum == INIT_FORKNUM { + // I doubt we need _init fork itself, but having it at least + // serves as a marker relation is unlogged. + self.add_rel(rel, rel).await?; + self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?; + return Ok(()); + } - if self.full_backup { - if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM)) - { - // skip this, will include it when we reach the init fork - continue; + if self.full_backup { + if rel.forknum == MAIN_FORKNUM + && rels.contains(&rel.with_forknum(INIT_FORKNUM)) + { + // skip this, will include it when we reach the init fork + return Ok(()); + } + self.add_rel(rel, rel).await?; + } + anyhow::Ok(()) } - self.add_rel(rel, rel).await?; + .instrument(debug_span!("process rel", ?rel)) + .await?; } - } - for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { - if path.starts_with("pg_replslot") { - let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; - let restart_lsn = Lsn(u64::from_le_bytes( - content[offs..offs + 8].try_into().unwrap(), - )); - info!("Replication slot {} restart LSN={}", path, restart_lsn); - min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + debug!("list aux files"); + for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? { + async { + debug!("iter"); + if path.starts_with("pg_replslot") { + let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN; + let restart_lsn = Lsn(u64::from_le_bytes( + content[offs..offs + 8].try_into().unwrap(), + )); + info!("Replication slot {} restart LSN={}", path, restart_lsn); + min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn); + } + let header = new_tar_header(&path, content.len() as u64)?; + self.ar + .append(&header, &*content) + .await + .context("could not add aux file to basebackup tarball")?; + anyhow::Ok(()) + } + .instrument(debug_span!("process aux file", ?path)) + .await?; } - let header = new_tar_header(&path, content.len() as u64)?; - self.ar - .append(&header, &*content) - .await - .context("could not add aux file to basebackup tarball")?; + + debug!("done"); + + anyhow::Ok(()) } + .instrument(debug_span!( + "process tablespace directory", + ?spcnode, + ?dbnode + )) + .await?; } if min_restart_lsn != Lsn::MAX { info!( @@ -244,19 +284,25 @@ where .await .context("could not add restart.lsn file to basebackup tarball")?; } + debug!("list twophase files"); for xid in self .timeline .list_twophase_files(self.lsn, self.ctx) .await? { - self.add_twophase_file(xid).await?; + async { + self.add_twophase_file(xid).await?; + anyhow::Ok(()) + } + .instrument(debug_span!("process twophase file", ?xid)) + .await?; } fail_point!("basebackup-before-control-file", |_| { bail!("failpoint basebackup-before-control-file") }); - // Generate pg_control and bootstrap WAL segment. + debug!("Generate pg_control and bootstrap WAL segment."); self.add_pgcontrol_file().await?; self.ar.finish().await?; debug!("all tarred up!"); diff --git a/pageserver/src/client/mgmt_api.rs b/pageserver/src/client/mgmt_api.rs index 9de0e533a8..c9a61ff64c 100644 --- a/pageserver/src/client/mgmt_api.rs +++ b/pageserver/src/client/mgmt_api.rs @@ -5,20 +5,22 @@ use utils::id::{TenantId, TimelineId}; pub struct Client { mgmt_api_endpoint: String, + authorization_header: Option, client: hyper::Client, } impl Client { - pub fn new(mgmt_api_endpoint: String) -> Self { + pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self { Self { mgmt_api_endpoint, + authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")), client: hyper::client::Client::new(), } } pub async fn list_tenants(&self) -> anyhow::Result> { let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?; - let resp = self.client.get(uri).await?; + let resp = self.get(uri).await?; if !resp.status().is_success() { anyhow::bail!("status error"); } @@ -34,7 +36,22 @@ impl Client { "{}/v1/tenant/{tenant_id}/timeline", self.mgmt_api_endpoint ))?; - let resp = self.client.get(uri).await?; + let resp = self.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn timeline_info( + &self, tenant_id: TenantId, timeline_id: TimelineId, + ) -> anyhow::Result { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}", + self.mgmt_api_endpoint + ))?; + let resp = self.get(uri).await?; if !resp.status().is_success() { anyhow::bail!("status error"); } @@ -51,11 +68,22 @@ impl Client { "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true", self.mgmt_api_endpoint ))?; - let resp = self.client.get(uri).await?; + let resp = self.get(uri).await?; if !resp.status().is_success() { anyhow::bail!("status error"); } let body = hyper::body::to_bytes(resp).await?; Ok(serde_json::from_slice(&body).context("deserialize")?) } + + async fn get(&self, uri: Uri) -> hyper::Result> { + let req = hyper::Request::builder().uri(uri).method("GET"); + let req = if let Some(value) = &self.authorization_header { + req.header("Authorization", value) + } else { + req + }; + let req = req.body(hyper::Body::default()); + self.client.request(req.unwrap()).await + } }