diff --git a/Cargo.lock b/Cargo.lock index 57c57182e1..e82c4051ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2634,6 +2634,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -2894,6 +2904,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "pagectl" version = "0.1.0" @@ -2979,10 +2995,12 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-postgres", + "tokio-stream", "tokio-tar", "tokio-util", "toml_edit", "tracing", + "tracing-subscriber", "url", "utils", "walkdir", @@ -5290,6 +5308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "serde", diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index b5350d6384..5ef09312a5 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -140,3 +140,36 @@ impl Key { }) } } + + +impl std::str::FromStr for Key { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + Self::from_hex(s) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use crate::repository::Key; + + #[test] + fn display_fromstr_bijection() { + let mut rng = rand::thread_rng(); + use rand::Rng; + + let key = Key { + field1: rng.gen(), + field2: rng.gen(), + field3: rng.gen(), + field4: rng.gen(), + field5: rng.gen(), + field6: rng.gen(), + }; + + assert_eq!(key, Key::from_str(&format!("{key}")).unwrap()); + } +} diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 71e32e479f..555c6e0713 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -18,7 +18,7 @@ use utils::{ use crate::{reltag::RelTag, shard::TenantShardId}; use anyhow::bail; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; /// The state of a tenant in this pageserver. /// @@ -767,6 +767,36 @@ impl PagestreamBeMessage { bytes.into() } + + pub fn deserialize(buf: Bytes) -> anyhow::Result { + let mut buf = buf.reader(); + let msg_tag = buf.read_u8()?; + match msg_tag { + 100 => todo!(), + 101 => todo!(), + 102 => { + let buf = buf.get_ref(); + /* TODO use constant */ + if buf.len() == 8192 { + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page: buf.clone(), + })) + } else { + anyhow::bail!("invalid page size: {}", buf.len()); + } + } + 103 => { + let buf = buf.get_ref(); + let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; + let rust_str = cstr.to_str()?; + Ok(PagestreamBeMessage::Error(PagestreamErrorResponse { + message: rust_str.to_owned(), + })) + } + 104 => todo!(), + _ => bail!("unknown tag: {:?}", msg_tag), + } + } } #[cfg(test)] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..2f2336a578 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,6 +82,8 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +tokio-stream.workspace = true +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs new file mode 100644 index 0000000000..d3b4a3bd1f --- /dev/null +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -0,0 +1,401 @@ +use anyhow::Context; +use clap::Parser; + +use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; +use pageserver::repository; + +use pageserver_api::reltag::RelTag; +use rand::prelude::*; +use tokio::sync::Barrier; +use tracing::info; +use utils::id::{TenantId, TimelineId}; +use utils::logging; + +use std::future::Future; + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use tokio::task::JoinHandle; + +struct KeyRange { + start: i128, + end: i128, +} + +impl KeyRange { + fn len(&self) -> i128 { + self.end - self.start + } +} + +struct RelTagBlockNo { + rel_tag: RelTag, + block_no: u32, +} + +#[derive(clap::Parser)] +struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "postgres://postgres@localhost:64000")] + page_service_connstring: String, + // tenant_id: String, + // timeline_id: String, + #[clap(long)] + num_tasks: usize, + #[clap(long)] + num_requests: usize, + #[clap(long)] + pick_n_tenants: Option, + tenants: Option>, +} + +#[derive(Debug, Default)] +struct LiveStats { + completed_requests: AtomicU64, +} + +impl LiveStats { + fn inc(&self) { + self.completed_requests.fetch_add(1, Ordering::Relaxed); + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let args: &'static Args = Box::leak(Box::new(Args::parse())); + + let client = Arc::new(mgmt_api_client::Client::new(args.mgmt_api_endpoint.clone())); + + let mut tenants: Vec = if let Some(tenants) = &args.tenants { + tenants.clone() + } else { + client + .list_tenants() + .await? + .into_iter() + .map(|ti| ti.id) + .collect() + }; + let tenants = if let Some(n) = args.pick_n_tenants { + tenants.truncate(n); + if tenants.len() != n { + anyhow::bail!("too few tenants: {} < {}", tenants.len(), n); + } + tenants + } else { + tenants + }; + + let mut tenant_timelines = Vec::new(); + for tenant_id in tenants { + tenant_timelines.extend( + client + .list_timelines(tenant_id) + .await? + .into_iter() + .map(|ti| (tenant_id, ti.timeline_id)), + ); + } + info!("tenant_timelines:\n{:?}", tenant_timelines); + + let stats = Arc::new(LiveStats::default()); + + let num_work_tasks = tenant_timelines.len() * args.num_tasks; + + let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1)); + + tokio::spawn({ + let stats = Arc::clone(&stats); + let start_work_barrier = Arc::clone(&start_work_barrier); + async move { + start_work_barrier.wait().await; + loop { + let start = std::time::Instant::now(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed); + let elapsed = start.elapsed(); + info!( + "RPS: {:.0}", + completed_requests as f64 / elapsed.as_secs_f64() + ); + } + } + }); + + let mut tasks = Vec::new(); + for (tenant_id, timeline_id) in tenant_timelines { + let stats = Arc::clone(&stats); + let t = tokio::spawn(timeline( + args, + client.clone(), + tenant_id, + timeline_id, + Arc::clone(&start_work_barrier), + stats, + )); + tasks.push(t); + } + + for t in tasks { + t.await.unwrap().unwrap(); + } + + anyhow::Ok(()) +} + +fn timeline( + args: &'static Args, + mgmt_api_client: Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + start_work_barrier: Arc, + stats: Arc, +) -> impl Future> + Send + Sync { + async move { + let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; + let lsn = partitioning.at_lsn; + let ranges = partitioning + .keys + .ranges + .iter() + .filter_map(|r| { + let start = r.start; + let end = r.end; + // filter out non-relblock keys + match (is_rel_block_key(start), is_rel_block_key(end)) { + (true, true) => Some(KeyRange { + start: start.to_i128(), + end: end.to_i128(), + }), + (true, false) | (false, true) => { + unimplemented!("split up range") + } + (false, false) => None, + } + }) + .collect::>(); + + // weighted ranges + let weights = ranges.iter().map(|r| r.len()).collect::>(); + + let ranges = Arc::new(ranges); + let weights = Arc::new(weights); + + let mut tasks = Vec::>::new(); + + for _i in 0..args.num_tasks { + let ranges = ranges.clone(); + let _weights = weights.clone(); + let start_work_barrier = Arc::clone(&start_work_barrier); + let task = tokio::spawn({ + let stats = Arc::clone(&stats); + async move { + let mut getpage_client = getpage_client::Client::new( + args.page_service_connstring.clone(), + tenant_id, + timeline_id, + ) + .await + .unwrap(); + start_work_barrier.wait().await; + for _i in 0..args.num_requests { + let key = { + let mut rng = rand::thread_rng(); + let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap(); + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + RelTagBlockNo { rel_tag, block_no } + }; + getpage_client + .getpage(key, lsn) + .await + .with_context(|| { + format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) + }) + .unwrap(); + stats.inc(); + } + getpage_client.shutdown().await; + } + }); + tasks.push(task); + } + + for task in tasks { + task.await.unwrap(); + } + + Ok(()) + } +} + +mod mgmt_api_client { + use anyhow::Context; + + use hyper::{client::HttpConnector, Uri}; + use utils::id::{TenantId, TimelineId}; + + pub(crate) struct Client { + mgmt_api_endpoint: String, + pub(crate) client: hyper::Client, + } + + impl Client { + pub fn new(mgmt_api_endpoint: String) -> Self { + Self { + mgmt_api_endpoint, + client: hyper::client::Client::new(), + } + } + + pub async fn list_tenants( + &self, + ) -> anyhow::Result> { + let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn list_timelines( + &self, + tenant_id: TenantId, + ) -> anyhow::Result> { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn keyspace( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body).context("deserialize")?) + } + } +} + +mod getpage_client { + use std::pin::Pin; + + use futures::SinkExt; + use pageserver_api::models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, + PagestreamGetPageResponse, + }; + use tokio::task::JoinHandle; + use tokio_stream::StreamExt; + use tokio_util::sync::CancellationToken; + use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, + }; + + use crate::RelTagBlockNo; + + pub(crate) struct Client { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, + } + + impl Client { + pub async fn new( + connstring: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let (client, connection) = + tokio_postgres::connect(&connstring, postgres::NoTls).await?; + + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { } + res = connection => { + res.unwrap(); + } + } + } + }); + + let copy_both: tokio_postgres::CopyBothDuplex = client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + + Ok(Self { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + }) + } + + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + + pub async fn getpage( + &mut self, + key: RelTagBlockNo, + lsn: Lsn, + ) -> anyhow::Result { + let req = PagestreamGetPageRequest { + latest: false, + rel: key.rel_tag, + blkno: key.block_no, + lsn, + }; + let req = PagestreamFeMessage::GetPage(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next = next.unwrap().unwrap(); + + match PagestreamBeMessage::deserialize(next)? { + PagestreamBeMessage::Exists(_) => todo!(), + PagestreamBeMessage::Nblocks(_) => todo!(), + PagestreamBeMessage::GetPage(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::DbSize(_) => todo!(), + } + } + } +} diff --git a/pageserver/src/http/mod.rs b/pageserver/src/http/mod.rs index 1c083bd382..220fa29b43 100644 --- a/pageserver/src/http/mod.rs +++ b/pageserver/src/http/mod.rs @@ -1,4 +1,4 @@ pub mod routes; pub use routes::make_router; -pub use pageserver_api::models; +pub mod models; diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs new file mode 100644 index 0000000000..1d8e76b7a7 --- /dev/null +++ b/pageserver/src/http/models.rs @@ -0,0 +1,3 @@ +//! If possible, use `::pageserver_api::models` instead. + +pub mod partitioning; diff --git a/pageserver/src/http/models/partitioning.rs b/pageserver/src/http/models/partitioning.rs new file mode 100644 index 0000000000..bf0a62f3a7 --- /dev/null +++ b/pageserver/src/http/models/partitioning.rs @@ -0,0 +1,112 @@ +use utils::lsn::Lsn; + +#[derive(Debug, PartialEq, Eq)] +pub struct Partitioning { + pub keys: crate::keyspace::KeySpace, + + pub at_lsn: Lsn, +} + +impl serde::Serialize for Partitioning { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace); + + impl<'a> serde::Serialize for KeySpace<'a> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeSeq; + let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?; + for kr in &self.0.ranges { + seq.serialize_element(&KeyRange(kr))?; + } + seq.end() + } + } + + use serde::ser::SerializeMap; + let mut map = serializer.serialize_map(Some(2))?; + map.serialize_key("keys")?; + map.serialize_value(&KeySpace(&self.keys))?; + map.serialize_key("at_lsn")?; + map.serialize_value(&WithDisplay(&self.at_lsn))?; + map.end() + } +} + +pub struct WithDisplay<'a, T>(&'a T); + +impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.collect_str(&self.0) + } +} + +pub struct KeyRange<'a>(&'a std::ops::Range); + +impl<'a> serde::Serialize for KeyRange<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeTuple; + let mut t = serializer.serialize_tuple(2)?; + t.serialize_element(&WithDisplay(&self.0.start))?; + t.serialize_element(&WithDisplay(&self.0.end))?; + t.end() + } +} + +impl<'a> serde::Deserialize<'a> for Partitioning { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + pub struct KeySpace(crate::keyspace::KeySpace); + + impl<'de> serde::Deserialize<'de> for KeySpace { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[serde_with::serde_as] + #[derive(serde::Deserialize)] + #[serde(transparent)] + struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key); + + #[serde_with::serde_as] + #[derive(serde::Deserialize)] + struct Range(Key, Key); + + let ranges: Vec = serde::Deserialize::deserialize(deserializer)?; + Ok(Self(crate::keyspace::KeySpace { + ranges: ranges + .into_iter() + .map(|Range(start, end)| (start.0..end.0)) + .collect(), + })) + } + } + + #[serde_with::serde_as] + #[derive(serde::Deserialize)] + struct De { + keys: KeySpace, + #[serde_as(as = "serde_with::DisplayFromStr")] + at_lsn: Lsn, + } + + let de: De = serde::Deserialize::deserialize(deserializer)?; + Ok(Self { + at_lsn: de.at_lsn, + keys: de.keys.0, + }) + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index aa2b017471..73577b80de 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -26,10 +26,6 @@ use utils::http::endpoint::request_span; use utils::http::json::json_request_or_empty_body; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; -use super::models::{ - StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, - TimelineCreateRequest, TimelineGcRequest, TimelineInfo, -}; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL}; @@ -46,6 +42,10 @@ use crate::tenant::timeline::Timeline; use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources}; use crate::{config::PageServerConf, tenant::mgr}; use crate::{disk_usage_eviction_task, tenant}; +use pageserver_api::models::{ + StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, + TimelineCreateRequest, TimelineGcRequest, TimelineInfo, +}; use utils::{ auth::SwappableJwtAuth, generation::Generation, @@ -61,7 +61,7 @@ use utils::{ }; // Imports only used for testing APIs -use super::models::ConfigureFailpointsRequest; +use pageserver_api::models::ConfigureFailpointsRequest; pub struct State { conf: &'static PageServerConf, @@ -1422,71 +1422,11 @@ async fn timeline_collect_keyspace( let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_id))?; - struct Partitioning { - keys: crate::keyspace::KeySpace, - - at_lsn: Lsn, - } - - impl serde::Serialize for Partitioning { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeMap; - let mut map = serializer.serialize_map(Some(2))?; - map.serialize_key("keys")?; - map.serialize_value(&KeySpace(&self.keys))?; - map.serialize_key("at_lsn")?; - map.serialize_value(&WithDisplay(&self.at_lsn))?; - map.end() - } - } - - struct WithDisplay<'a, T>(&'a T); - - impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - serializer.collect_str(&self.0) - } - } - - struct KeySpace<'a>(&'a crate::keyspace::KeySpace); - - impl<'a> serde::Serialize for KeySpace<'a> { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeSeq; - let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?; - for kr in &self.0.ranges { - seq.serialize_element(&KeyRange(kr))?; - } - seq.end() - } - } - - struct KeyRange<'a>(&'a std::ops::Range); - - impl<'a> serde::Serialize for KeyRange<'a> { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - use serde::ser::SerializeTuple; - let mut t = serializer.serialize_tuple(2)?; - t.serialize_element(&WithDisplay(&self.0.start))?; - t.serialize_element(&WithDisplay(&self.0.end))?; - t.end() - } - } - let at_lsn: Option = parse_query_param(&request, "at_lsn")?; + let check_serialization_roundtrip: bool = + parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false); + async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; @@ -1496,7 +1436,20 @@ async fn timeline_collect_keyspace( .await .map_err(|e| ApiError::InternalServerError(e.into()))?; - json_response(StatusCode::OK, Partitioning { keys, at_lsn }) + let res = crate::http::models::partitioning::Partitioning { keys, at_lsn }; + if check_serialization_roundtrip { + (|| { + let ser = serde_json::ser::to_vec(&res).context("serialize")?; + let de: crate::http::models::partitioning::Partitioning = + serde_json::from_slice(&ser).context("deserialize")?; + anyhow::ensure!(de == res, "not equal"); + info!("passed serialization rountrip check"); + Ok(()) + })() + .context("serialization rountrip") + .map_err(ApiError::InternalServerError)?; + } + json_response(StatusCode::OK, res) } .instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id)) .await diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index 20e6df9c7b..970c96589e 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -5,7 +5,7 @@ use std::ops::Range; /// /// Represents a set of Keys, in a compact form. /// -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct KeySpace { /// Contiguous ranges of keys that belong to the key space. In key order, /// and with no overlap. diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 827278af72..a7de440301 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1749,6 +1749,7 @@ const AUX_FILES_KEY: Key = Key { // Reverse mappings for a few Keys. // These are needed by WAL redo manager. +/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`. pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { Ok(match key.field1 { 0x00 => ( @@ -1764,7 +1765,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { }) } -fn is_rel_block_key(key: Key) -> bool { +/// See [[key_to_rel_block]]. +pub fn is_rel_block_key(key: Key) -> bool { key.field1 == 0x00 && key.field4 != 0 }