mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 07:30:37 +00:00
Compare commits
5 Commits
remove-pos
...
problame/2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9742e253ee | ||
|
|
e253101727 | ||
|
|
96ab3d67a7 | ||
|
|
4fb1a7a35a | ||
|
|
800e0802ea |
75
Cargo.lock
generated
75
Cargo.lock
generated
@@ -1955,6 +1955,20 @@ dependencies = [
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
@@ -2634,6 +2648,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -2894,6 +2918,31 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"futures",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"pageserver",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
@@ -2979,6 +3028,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
@@ -5218,6 +5268,17 @@ dependencies = [
|
||||
"syn 2.0.28",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-chrome"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.31"
|
||||
@@ -5238,6 +5299,17 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-flame"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
@@ -5290,6 +5362,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -5504,7 +5577,9 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-chrome",
|
||||
"tracing-error",
|
||||
"tracing-flame",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
|
||||
@@ -5,6 +5,7 @@ members = [
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
@@ -79,6 +80,7 @@ futures-util = "0.3"
|
||||
git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hdrhistogram = "7.5.2"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
|
||||
@@ -86,7 +86,10 @@ where
|
||||
.stdout(process_log_file)
|
||||
.stderr(same_file_for_stderr)
|
||||
.args(args);
|
||||
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
|
||||
|
||||
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
|
||||
fill_rust_env_vars(background_command),
|
||||
));
|
||||
filled_cmd.envs(envs);
|
||||
|
||||
let pid_file_to_check = match initial_pid_file {
|
||||
@@ -253,6 +256,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
cmd
|
||||
}
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
cmd
|
||||
}
|
||||
|
||||
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
|
||||
/// 1. Claims a pidfile with a fcntl lock on it and
|
||||
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)
|
||||
|
||||
@@ -283,7 +283,7 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -140,3 +140,35 @@ impl Key {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
Self::from_hex(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::key::Key;
|
||||
|
||||
#[test]
|
||||
fn display_fromstr_bijection() {
|
||||
let mut rng = rand::thread_rng();
|
||||
use rand::Rng;
|
||||
|
||||
let key = Key {
|
||||
field1: rng.gen(),
|
||||
field2: rng.gen(),
|
||||
field3: rng.gen(),
|
||||
field4: rng.gen(),
|
||||
field5: rng.gen(),
|
||||
field6: rng.gen(),
|
||||
};
|
||||
|
||||
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{
|
||||
|
||||
use crate::{reltag::RelTag, shard::TenantShardId};
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
@@ -767,6 +767,36 @@ impl PagestreamBeMessage {
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
match msg_tag {
|
||||
100 => todo!(),
|
||||
101 => todo!(),
|
||||
102 => {
|
||||
let buf = buf.get_ref();
|
||||
/* TODO use constant */
|
||||
if buf.len() == 8192 {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page: buf.clone(),
|
||||
}))
|
||||
} else {
|
||||
anyhow::bail!("invalid page size: {}", buf.len());
|
||||
}
|
||||
}
|
||||
103 => {
|
||||
let buf = buf.get_ref();
|
||||
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
|
||||
let rust_str = cstr.to_str()?;
|
||||
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: rust_str.to_owned(),
|
||||
}))
|
||||
}
|
||||
104 => todo!(),
|
||||
_ => bail!("unknown tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -278,7 +278,7 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
let _ = utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -207,7 +207,7 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
let _ = utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -49,6 +49,8 @@ const_format.workspace = true
|
||||
# to use tokio channels as streams, this is faster to compile than async_stream
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
tracing-chrome = "0.7.1"
|
||||
tracing-flame = "0.2.0"
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::str::FromStr;
|
||||
use std::{io::BufWriter, str::FromStr};
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -73,11 +73,18 @@ pub enum Output {
|
||||
Stderr,
|
||||
}
|
||||
|
||||
/// Keep alive and drop it before the program terminates.
|
||||
#[must_use]
|
||||
pub struct FlushGuard {
|
||||
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
|
||||
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<FlushGuard> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
@@ -85,11 +92,51 @@ pub fn init(
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
// WIP: lift it up as an argument
|
||||
let enable_tracing_chrome = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_CHROME") {
|
||||
Ok(s) if s != "0" => true,
|
||||
Ok(_s) => false,
|
||||
Err(std::env::VarError::NotPresent) => false,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_CHROME not unicode")
|
||||
}
|
||||
};
|
||||
|
||||
// WIP: lift it up as an argument
|
||||
let enable_tracing_flame = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_FLAME") {
|
||||
Ok(s) if s != "0" => true,
|
||||
Ok(_s) => false,
|
||||
Err(std::env::VarError::NotPresent) => false,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_FLAME not unicode")
|
||||
}
|
||||
};
|
||||
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
let r = tracing_subscriber::registry();
|
||||
let r = r.with({
|
||||
|
||||
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
|
||||
#[derive(Default)]
|
||||
struct LayerStack {
|
||||
layers:
|
||||
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
|
||||
}
|
||||
impl LayerStack {
|
||||
fn add_layer<L>(&mut self, new_layer: L)
|
||||
where
|
||||
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
|
||||
{
|
||||
let new = match self.layers.take() {
|
||||
Some(layers) => Some(layers.and_then(new_layer).boxed()),
|
||||
None => Some(new_layer.boxed()),
|
||||
};
|
||||
self.layers = new;
|
||||
}
|
||||
}
|
||||
let mut layers = LayerStack::default();
|
||||
|
||||
layers.add_layer({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(false)
|
||||
@@ -106,15 +153,47 @@ pub fn init(
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
|
||||
|
||||
layers
|
||||
.add_layer(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
|
||||
|
||||
let tracing_chrome_layer_flush_guard = if enable_tracing_chrome {
|
||||
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
|
||||
.trace_style(tracing_chrome::TraceStyle::Async)
|
||||
.build();
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let tracing_flame_flush_guard = if enable_tracing_flame {
|
||||
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let layer = layer
|
||||
.with_empty_samples(false)
|
||||
.with_module_path(false)
|
||||
.with_file_and_line(false)
|
||||
.with_threads_collapsed(true);
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
|
||||
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
.init(),
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
|
||||
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
|
||||
TracingErrorLayerEnablement::Disabled => (),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let r = tracing_subscriber::registry();
|
||||
r.with(layers.layers.expect("we add at least one layer"))
|
||||
.init();
|
||||
|
||||
Ok(FlushGuard {
|
||||
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
|
||||
_tracing_flame_layer: tracing_flame_flush_guard,
|
||||
})
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
|
||||
@@ -366,6 +366,47 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
|
||||
|
||||
impl rand::distributions::uniform::SampleUniform for Lsn {
|
||||
type Sampler = LsnSampler;
|
||||
}
|
||||
|
||||
impl rand::distributions::uniform::UniformSampler for LsnSampler {
|
||||
type X = Lsn;
|
||||
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
|
||||
Lsn(self.0.sample(rng))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bin_ser::BeSer;
|
||||
|
||||
@@ -61,6 +61,7 @@ thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tracing.workspace = true
|
||||
|
||||
22
pageserver/pagebench/Cargo.toml
Normal file
22
pageserver/pagebench/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
pageserver = { path = ".." }
|
||||
utils = { path = "../../libs/utils/" }
|
||||
403
pageserver/pagebench/src/basebackup.rs
Normal file
403
pageserver/pagebench/src/basebackup.rs
Normal file
@@ -0,0 +1,403 @@
|
||||
use anyhow::Context;
|
||||
use pageserver::client::page_service::BasebackupRequest;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, info, instrument};
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// basebackup@LatestLSN
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long, default_value = "1.0")]
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: PerTaskOutput,
|
||||
}
|
||||
|
||||
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
|
||||
|
||||
struct LatencyPercentiles {
|
||||
latency_percentiles: [Duration; 4],
|
||||
}
|
||||
|
||||
impl serde::Serialize for LatencyPercentiles {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct PerTaskOutput {
|
||||
request_count: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
latency_mean: Duration,
|
||||
latency_percentiles: LatencyPercentiles,
|
||||
}
|
||||
|
||||
struct ThreadLocalStats {
|
||||
latency_histo: hdrhistogram::Histogram<u64>,
|
||||
}
|
||||
|
||||
impl ThreadLocalStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
|
||||
// which would skew the benchmark results.
|
||||
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
|
||||
}
|
||||
}
|
||||
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
|
||||
let micros: u64 = latency
|
||||
.as_micros()
|
||||
.try_into()
|
||||
.context("latency greater than u64")?;
|
||||
self.latency_histo
|
||||
.record(micros)
|
||||
.context("add to histogram")?;
|
||||
Ok(())
|
||||
}
|
||||
fn output(&self) -> PerTaskOutput {
|
||||
let latency_percentiles = std::array::from_fn(|idx| {
|
||||
let micros = self
|
||||
.latency_histo
|
||||
.value_at_percentile(LATENCY_PERCENTILES[idx]);
|
||||
Duration::from_micros(micros)
|
||||
});
|
||||
PerTaskOutput {
|
||||
request_count: self.latency_histo.len(),
|
||||
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
|
||||
latency_percentiles: LatencyPercentiles {
|
||||
latency_percentiles,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&mut self, other: &Self) {
|
||||
let Self {
|
||||
ref mut latency_histo,
|
||||
} = self;
|
||||
latency_histo.add(&other.latency_histo).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
|
||||
Arc::new(Mutex::new(ThreadLocalStats::new()))
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.on_thread_start({
|
||||
let thread_local_stats = Arc::clone(&thread_local_stats);
|
||||
move || {
|
||||
// pre-initialize the histograms
|
||||
STATS.with(|stats| {
|
||||
let stats: Arc<_> = Arc::clone(&*stats.borrow());
|
||||
thread_local_stats.lock().unwrap().push(stats);
|
||||
});
|
||||
}
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args, thread_local_stats));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
struct Target {
|
||||
timeline: TenantTimelineId,
|
||||
lsn_range: Option<Range<Lsn>>,
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let tenants: Vec<TenantId> = mgmt_api_client
|
||||
.list_tenants()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| ti.id)
|
||||
.collect();
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, tl_infos) = res.unwrap();
|
||||
for tl in tl_infos {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: tl.timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let timeline = *timeline;
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
async move {
|
||||
anyhow::Ok(Target {
|
||||
timeline,
|
||||
// TODO: lsn_range != latest LSN
|
||||
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
|
||||
})
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_targets: Vec<Target> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_targets.push(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender = async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (timeline, work) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
(
|
||||
target.timeline,
|
||||
Work {
|
||||
lsn,
|
||||
gzip: rng.gen_bool(args.gzip_probability),
|
||||
},
|
||||
)
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send(work).await.ok().unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = ThreadLocalStats::new();
|
||||
for stats in thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&*stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct Work {
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<Work>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client =
|
||||
pageserver::client::page_service::Client::new(crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
let copy_out_stream = client
|
||||
.basebackup(&BasebackupRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("start basebackup for {timeline}"))
|
||||
.unwrap();
|
||||
|
||||
use futures::StreamExt;
|
||||
let size = Arc::new(AtomicUsize::new(0));
|
||||
copy_out_stream
|
||||
.for_each({
|
||||
|r| {
|
||||
let size = Arc::clone(&size);
|
||||
async move {
|
||||
let size = Arc::clone(&size);
|
||||
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
19
pageserver/pagebench/src/main.rs
Normal file
19
pageserver/pagebench/src/main.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
use clap::Parser;
|
||||
|
||||
pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
|
||||
/// Component-level performance test for pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
enum Args {
|
||||
Basebackup(basebackup::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let args = Args::parse();
|
||||
match args {
|
||||
Args::Basebackup(args) => basebackup::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
2
pageserver/pagebench/src/util.rs
Normal file
2
pageserver/pagebench/src/util.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub(crate) mod connstring;
|
||||
pub(crate) mod tenant_timeline_id;
|
||||
8
pageserver/pagebench/src/util/connstring.rs
Normal file
8
pageserver/pagebench/src/util/connstring.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
|
||||
let colon_and_jwt = if let Some(jwt) = jwt {
|
||||
format!(":{jwt}") // TODO: urlescape
|
||||
} else {
|
||||
format!("")
|
||||
};
|
||||
format!("postgres://postgres{colon_and_jwt}@{host_port}")
|
||||
}
|
||||
36
pageserver/pagebench/src/util/tenant_timeline_id.rs
Normal file
36
pageserver/pagebench/src/util/tenant_timeline_id.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (tenant_id, timeline_id) = s
|
||||
.split_once("/")
|
||||
.context("tenant and timeline id must be separated by `/`")?;
|
||||
let tenant_id = TenantId::from_str(&tenant_id)
|
||||
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
|
||||
let timeline_id = TimelineId::from_str(&timeline_id)
|
||||
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
@@ -166,71 +166,111 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Gather non-relational files from object storage pages.
|
||||
debug!("Gather non-relational files from object storage pages");
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
async {
|
||||
debug!("list slru segments");
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
async {
|
||||
debug!("add slru segment");
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("slru segment", ?segno))
|
||||
.await?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("non-rel file", ?kind))
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
debug!("Create tablespace directories");
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
async {
|
||||
debug!("iter");
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
debug!("list rels");
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
async {
|
||||
debug!("iter");
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
continue;
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM
|
||||
&& rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
return Ok(());
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
.instrument(debug_span!("process rel", ?rel))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
debug!("list aux files");
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
async {
|
||||
debug!("iter");
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("process aux file", ?path))
|
||||
.await?;
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
|
||||
debug!("done");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!(
|
||||
"process tablespace directory",
|
||||
?spcnode,
|
||||
?dbnode
|
||||
))
|
||||
.await?;
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
info!(
|
||||
@@ -244,19 +284,25 @@ where
|
||||
.await
|
||||
.context("could not add restart.lsn file to basebackup tarball")?;
|
||||
}
|
||||
debug!("list twophase files");
|
||||
for xid in self
|
||||
.timeline
|
||||
.list_twophase_files(self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_twophase_file(xid).await?;
|
||||
async {
|
||||
self.add_twophase_file(xid).await?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("process twophase file", ?xid))
|
||||
.await?;
|
||||
}
|
||||
|
||||
fail_point!("basebackup-before-control-file", |_| {
|
||||
bail!("failpoint basebackup-before-control-file")
|
||||
});
|
||||
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
debug!("Generate pg_control and bootstrap WAL segment.");
|
||||
self.add_pgcontrol_file().await?;
|
||||
self.ar.finish().await?;
|
||||
debug!("all tarred up!");
|
||||
|
||||
@@ -103,7 +103,7 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
logging::Output::Stdout,
|
||||
|
||||
2
pageserver/src/client.rs
Normal file
2
pageserver/src/client.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod mgmt_api;
|
||||
pub mod page_service;
|
||||
91
pageserver/src/client/mgmt_api.rs
Normal file
91
pageserver/src/client/mgmt_api.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use anyhow::Context;
|
||||
|
||||
use hyper::{client::HttpConnector, Uri};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct Client {
|
||||
mgmt_api_endpoint: String,
|
||||
authorization_header: Option<String>,
|
||||
client: hyper::Client<HttpConnector, hyper::Body>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
|
||||
client: hyper::client::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_tenants(&self) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
|
||||
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn list_timelines(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn timeline_info(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<pageserver_api::models::TimelineInfo> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn keyspace(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<crate::http::models::partitioning::Partitioning> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body).context("deserialize")?)
|
||||
}
|
||||
|
||||
async fn get(&self, uri: Uri) -> hyper::Result<hyper::Response<hyper::Body>> {
|
||||
let req = hyper::Request::builder().uri(uri).method("GET");
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header("Authorization", value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
let req = req.body(hyper::Body::default());
|
||||
self.client.request(req.unwrap()).await
|
||||
}
|
||||
}
|
||||
145
pageserver/src/client/page_service.rs
Normal file
145
pageserver/src/client/page_service.rs
Normal file
@@ -0,0 +1,145 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::CopyOutStream;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
pub struct Client {
|
||||
client: tokio_postgres::Client,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub struct BasebackupRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Option<Lsn>,
|
||||
pub gzip: bool,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(connstring: String) -> anyhow::Result<Self> {
|
||||
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
|
||||
|
||||
let conn_task_cancel = CancellationToken::new();
|
||||
let conn_task = tokio::spawn({
|
||||
let conn_task_cancel = conn_task_cancel.clone();
|
||||
async move {
|
||||
tokio::select! {
|
||||
_ = conn_task_cancel.cancelled() => { }
|
||||
res = connection => {
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(Self {
|
||||
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
|
||||
conn_task,
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn pagestream(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<PagestreamClient> {
|
||||
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
|
||||
.client
|
||||
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
let Client {
|
||||
cancel_on_client_drop,
|
||||
conn_task,
|
||||
client: _,
|
||||
} = self;
|
||||
Ok(PagestreamClient {
|
||||
copy_both: Box::pin(copy_both),
|
||||
conn_task,
|
||||
cancel_on_client_drop,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
|
||||
let BasebackupRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
} = req;
|
||||
let mut args = Vec::with_capacity(5);
|
||||
args.push("basebackup".to_string());
|
||||
args.push(format!("{tenant_id}"));
|
||||
args.push(format!("{timeline_id}"));
|
||||
if let Some(lsn) = lsn {
|
||||
args.push(format!("{lsn}"));
|
||||
}
|
||||
if *gzip {
|
||||
args.push(format!("--gzip"))
|
||||
}
|
||||
Ok(self.client.copy_out(&args.join(" ")).await?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create using [`Client::pagestream`].
|
||||
pub struct PagestreamClient {
|
||||
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub struct RelTagBlockNo {
|
||||
pub rel_tag: RelTag,
|
||||
pub block_no: u32,
|
||||
}
|
||||
|
||||
impl PagestreamClient {
|
||||
pub async fn shutdown(mut self) {
|
||||
let _ = self.cancel_on_client_drop.take();
|
||||
self.conn_task.await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn getpage(
|
||||
&mut self,
|
||||
key: RelTagBlockNo,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamGetPageRequest {
|
||||
latest: false,
|
||||
rel: key.rel_tag,
|
||||
blkno: key.block_no,
|
||||
lsn,
|
||||
};
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next = next.unwrap().unwrap();
|
||||
|
||||
match PagestreamBeMessage::deserialize(next)? {
|
||||
PagestreamBeMessage::Exists(_) => todo!(),
|
||||
PagestreamBeMessage::Nblocks(_) => todo!(),
|
||||
PagestreamBeMessage::GetPage(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::DbSize(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub use pageserver_api::models;
|
||||
pub mod models;
|
||||
|
||||
3
pageserver/src/http/models.rs
Normal file
3
pageserver/src/http/models.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
//! If possible, use `::pageserver_api::models` instead.
|
||||
|
||||
pub mod partitioning;
|
||||
112
pageserver/src/http/models/partitioning.rs
Normal file
112
pageserver/src/http/models/partitioning.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct Partitioning {
|
||||
pub keys: crate::keyspace::KeySpace,
|
||||
|
||||
pub at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> serde::Deserialize<'a> for Partitioning {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'a>,
|
||||
{
|
||||
pub struct KeySpace(crate::keyspace::KeySpace);
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for KeySpace {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key);
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Range(Key, Key);
|
||||
|
||||
let ranges: Vec<Range> = serde::Deserialize::deserialize(deserializer)?;
|
||||
Ok(Self(crate::keyspace::KeySpace {
|
||||
ranges: ranges
|
||||
.into_iter()
|
||||
.map(|Range(start, end)| (start.0..end.0))
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
struct De {
|
||||
keys: KeySpace,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
let de: De = serde::Deserialize::deserialize(deserializer)?;
|
||||
Ok(Self {
|
||||
at_lsn: de.at_lsn,
|
||||
keys: de.keys.0,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -26,10 +26,6 @@ use utils::http::endpoint::request_span;
|
||||
use utils::http::json::json_request_or_empty_body;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
@@ -46,6 +42,10 @@ use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
generation::Generation,
|
||||
@@ -61,7 +61,7 @@ use utils::{
|
||||
};
|
||||
|
||||
// Imports only used for testing APIs
|
||||
use super::models::ConfigureFailpointsRequest;
|
||||
use pageserver_api::models::ConfigureFailpointsRequest;
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -1422,71 +1422,11 @@ async fn timeline_collect_keyspace(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
struct Partitioning {
|
||||
keys: crate::keyspace::KeySpace,
|
||||
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
let check_serialization_roundtrip: bool =
|
||||
parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false);
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
@@ -1496,7 +1436,20 @@ async fn timeline_collect_keyspace(
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
let res = crate::http::models::partitioning::Partitioning { keys, at_lsn };
|
||||
if check_serialization_roundtrip {
|
||||
(|| {
|
||||
let ser = serde_json::ser::to_vec(&res).context("serialize")?;
|
||||
let de: crate::http::models::partitioning::Partitioning =
|
||||
serde_json::from_slice(&ser).context("deserialize")?;
|
||||
anyhow::ensure!(de == res, "not equal");
|
||||
info!("passed serialization rountrip check");
|
||||
Ok(())
|
||||
})()
|
||||
.context("serialization rountrip")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
json_response(StatusCode::OK, res)
|
||||
}
|
||||
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
|
||||
.await
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ops::Range;
|
||||
///
|
||||
/// Represents a set of Keys, in a compact form.
|
||||
///
|
||||
#[derive(Clone, Debug, Default)]
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct KeySpace {
|
||||
/// Contiguous ranges of keys that belong to the key space. In key order,
|
||||
/// and with no overlap.
|
||||
|
||||
@@ -25,6 +25,7 @@ pub mod walingest;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
|
||||
pub mod client;
|
||||
pub mod failpoint_support;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
@@ -20,12 +20,14 @@ use std::io::{Error, ErrorKind};
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
#[tracing::instrument(skip_all, fields(%offset), level = tracing::Level::DEBUG)]
|
||||
pub async fn read_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
tracing::debug!("reading blob");
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
@@ -141,6 +141,7 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
#[tracing::instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
|
||||
@@ -181,6 +181,7 @@ impl LayerMap {
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
|
||||
@@ -226,6 +226,7 @@ impl Layer {
|
||||
///
|
||||
/// It is up to the caller to collect more data from the previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
|
||||
@@ -468,6 +468,7 @@ impl Timeline {
|
||||
/// an ancestor branch, for example, or waste a lot of cycles chasing the
|
||||
/// non-existing key.
|
||||
///
|
||||
#[instrument(skip_all, fields(%key, %lsn), level = tracing::Level::DEBUG)]
|
||||
pub async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -2044,6 +2045,7 @@ impl Timeline {
|
||||
///
|
||||
/// This function takes the current timeline's locked LayerMap as an argument,
|
||||
/// so callers can avoid potential race conditions.
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn get_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
|
||||
@@ -199,7 +199,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -431,7 +431,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
Reference in New Issue
Block a user