mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-05 15:20:39 +00:00
Compare commits
19 Commits
skyzh/uplo
...
bojan-get-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88fab12d44 | ||
|
|
80dd714f01 | ||
|
|
a06af3c813 | ||
|
|
be86621152 | ||
|
|
a7870d708b | ||
|
|
ccb5df93ef | ||
|
|
c2adb7ac2d | ||
|
|
7482d3df70 | ||
|
|
735145571f | ||
|
|
1622de3fda | ||
|
|
382e567fc0 | ||
|
|
d2b00b7e35 | ||
|
|
b92e1763ec | ||
|
|
da66df21f3 | ||
|
|
0b53968db4 | ||
|
|
7fc488ff4b | ||
|
|
2ca920e4cb | ||
|
|
e74ddf4391 | ||
|
|
c2814e9828 |
81
Cargo.lock
generated
81
Cargo.lock
generated
@@ -307,17 +307,41 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.0.14"
|
||||
version = "3.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62"
|
||||
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"bitflags",
|
||||
"clap_derive",
|
||||
"clap_lex",
|
||||
"indexmap",
|
||||
"os_str_bytes",
|
||||
"lazy_static",
|
||||
"strsim 0.10.0",
|
||||
"termcolor",
|
||||
"textwrap 0.14.2",
|
||||
"textwrap 0.15.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_derive"
|
||||
version = "3.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
|
||||
dependencies = [
|
||||
"heck 0.4.0",
|
||||
"proc-macro-error",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap_lex"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
|
||||
dependencies = [
|
||||
"os_str_bytes",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -336,7 +360,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"clap 3.0.14",
|
||||
"clap 3.1.12",
|
||||
"env_logger",
|
||||
"hyper",
|
||||
"libc",
|
||||
@@ -990,6 +1014,12 @@ dependencies = [
|
||||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heck"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
@@ -1603,9 +1633,6 @@ name = "os_str_bytes"
|
||||
version = "6.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver"
|
||||
@@ -1616,7 +1643,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap 3.0.14",
|
||||
"clap 3.1.12",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"crossbeam-utils",
|
||||
@@ -1911,6 +1938,30 @@ version = "0.2.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||
dependencies = [
|
||||
"proc-macro-error-attr",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error-attr"
|
||||
version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-hack"
|
||||
version = "0.5.19"
|
||||
@@ -1957,7 +2008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck",
|
||||
"heck 0.3.3",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"log",
|
||||
@@ -2001,7 +2052,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"base64",
|
||||
"bytes",
|
||||
"clap 3.0.14",
|
||||
"clap 3.1.12",
|
||||
"futures",
|
||||
"hashbrown",
|
||||
"hex",
|
||||
@@ -2431,7 +2482,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"clap 3.0.14",
|
||||
"clap 3.1.12",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"daemonize",
|
||||
@@ -2826,9 +2877,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "textwrap"
|
||||
version = "0.14.2"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
|
||||
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
@@ -3629,7 +3680,7 @@ name = "zenith"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap 3.0.14",
|
||||
"clap 3.1.12",
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"postgres",
|
||||
|
||||
@@ -163,7 +163,7 @@ fn main() -> Result<()> {
|
||||
|
||||
// Env variable is set by `cargo`
|
||||
let version: Option<&str> = option_env!("CARGO_PKG_VERSION");
|
||||
let matches = clap::App::new("zenith_ctl")
|
||||
let matches = clap::Command::new("zenith_ctl")
|
||||
.version(version.unwrap_or("unknown"))
|
||||
.arg(
|
||||
Arg::new("connstr")
|
||||
|
||||
@@ -18,7 +18,7 @@ hex = "0.4.3"
|
||||
hyper = "0.14"
|
||||
itertools = "0.10.3"
|
||||
lazy_static = "1.4.0"
|
||||
clap = "3.0"
|
||||
clap = { version = "3.1.8", features = ["derive"] }
|
||||
daemonize = "0.4.1"
|
||||
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! A handy tool for debugging, that's all.
|
||||
use anyhow::Result;
|
||||
use clap::{App, Arg};
|
||||
use clap::{Arg, Command};
|
||||
use pageserver::layered_repository::dump_layerfile_from_path;
|
||||
use pageserver::page_cache;
|
||||
use pageserver::virtual_file;
|
||||
@@ -10,7 +10,7 @@ use std::path::PathBuf;
|
||||
use utils::GIT_VERSION;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith dump_layerfile utility")
|
||||
let arg_matches = Command::new("Zenith dump_layerfile utility")
|
||||
.about("Dump contents of one layer file, for debugging")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
|
||||
@@ -5,7 +5,7 @@ use tracing::*;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use clap::{Arg, Command};
|
||||
use daemonize::Daemonize;
|
||||
|
||||
use pageserver::{
|
||||
@@ -35,7 +35,7 @@ fn version() -> String {
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
metrics::set_common_metrics_prefix("pageserver");
|
||||
let arg_matches = App::new("Zenith page server")
|
||||
let arg_matches = Command::new("Zenith page server")
|
||||
.about("Materializes WAL stream to pages and serves them to the postgres")
|
||||
.version(&*version())
|
||||
.arg(
|
||||
@@ -189,6 +189,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
// Initialize logger
|
||||
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
|
||||
|
||||
// Initialize wal metadata logger, if necessary
|
||||
pageserver::wal_metadata::init(conf).expect("wal_metadata init failed");
|
||||
|
||||
info!("version: {}", GIT_VERSION);
|
||||
|
||||
// TODO: Check that it looks like a valid repository before going further
|
||||
|
||||
265
pageserver/src/bin/psbench.rs
Normal file
265
pageserver/src/bin/psbench.rs
Normal file
@@ -0,0 +1,265 @@
|
||||
//! Pageserver benchmark tool
|
||||
//!
|
||||
//! This tool connects directly to a pageserver, issues queries and measures performance.
|
||||
//!
|
||||
//! Ideally the tool would be ablle to stream WAL into the pageserver, and (possibly
|
||||
//! simultaneously) make read requests. Currently wal streaming is not implemented,
|
||||
//! so this tool assumes the pageserver is prepopulated with some data, and only
|
||||
//! issues read queries. It also currently assumes that the pageserver writes out some
|
||||
//! metadata describing the write access pattern on the workload that was performed on it.
|
||||
//! See the python tests that use psbench for usage example.
|
||||
//!
|
||||
//! This tool runs a variety of workloads. See the PsbenchTest enum below, or run the tool
|
||||
//! with --help to see the available workloads.
|
||||
//!
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use clap::{Parser, Subcommand};
|
||||
use futures::future;
|
||||
use pageserver::wal_metadata::{Page, WalEntryMetadata};
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use std::fs::File;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Instant;
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
io::{BufRead, BufReader, Cursor},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use utils::zid::{ZTenantId, ZTimelineId};
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
/// Client for the pageserver's pagestream API
|
||||
struct PagestreamApi {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
/// Good enough implementation for these tests
|
||||
impl PagestreamApi {
|
||||
async fn connect(tenant: &ZTenantId, timeline: &ZTimelineId) -> Result<PagestreamApi> {
|
||||
let mut stream = TcpStream::connect("localhost:15000").await?;
|
||||
|
||||
// Connect to pageserver
|
||||
// TODO read host, port, dbname, user from command line
|
||||
let (client, conn) = tokio_postgres::Config::new()
|
||||
.host("127.0.0.1")
|
||||
.port(15000)
|
||||
.dbname("postgres")
|
||||
.user("zenith_admin")
|
||||
.connect_raw(&mut stream, tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
// Enter pagestream protocol
|
||||
let init_query = format!("pagestream {} {}", tenant, timeline);
|
||||
tokio::select! {
|
||||
_ = conn => panic!("connection closed during pagestream initialization"),
|
||||
_ = client.query(init_query.as_str(), &[]) => (),
|
||||
};
|
||||
|
||||
Ok(PagestreamApi { stream })
|
||||
}
|
||||
|
||||
async fn get_page(&mut self, lsn: &Lsn, page: &Page, latest: bool) -> anyhow::Result<Vec<u8>> {
|
||||
let latest: u8 = if latest { 1 } else { 0 };
|
||||
let msg = {
|
||||
let query = {
|
||||
let mut query = BytesMut::new();
|
||||
query.put_u8(2); // Specifies get_page query
|
||||
query.put_u8(latest);
|
||||
query.put_u64(lsn.0);
|
||||
page.write(&mut query).await?;
|
||||
query.freeze()
|
||||
};
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&query);
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
|
||||
self.stream.write_all(&msg).await?;
|
||||
|
||||
let response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||
};
|
||||
|
||||
let page = {
|
||||
let mut cursor = Cursor::new(response);
|
||||
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
|
||||
|
||||
match tag {
|
||||
102 => {
|
||||
let mut page = Vec::<u8>::new();
|
||||
cursor.read_to_end(&mut page).await?;
|
||||
if page.len() != (BLCKSZ as usize) {
|
||||
panic!("Expected 8kb page, got: {:?}", page.len());
|
||||
}
|
||||
page
|
||||
}
|
||||
103 => {
|
||||
let mut bytes = Vec::<u8>::new();
|
||||
cursor.read_to_end(&mut bytes).await?;
|
||||
let message = String::from_utf8(bytes)?;
|
||||
panic!("Got error message: {}", message);
|
||||
}
|
||||
_ => panic!("Unhandled tag {:?}", tag),
|
||||
}
|
||||
};
|
||||
|
||||
Ok(page)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parsed wal_metadata file with additional derived
|
||||
/// statistics for convenience.
|
||||
#[derive(Clone)]
|
||||
struct Metadata {
|
||||
// Parsed from metadata file
|
||||
wal_metadata: Vec<WalEntryMetadata>,
|
||||
|
||||
// Derived from wal_metadata
|
||||
total_wal_size: usize,
|
||||
affected_pages: HashSet<Page>,
|
||||
latest_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
/// Construct metadata object from wal_metadata file emitted by pageserver
|
||||
fn build(wal_metadata_path: &Path) -> Result<Metadata> {
|
||||
let wal_metadata_file = File::open(wal_metadata_path).expect("error opening wal_metadata");
|
||||
let wal_metadata: Vec<WalEntryMetadata> = BufReader::new(wal_metadata_file)
|
||||
.lines()
|
||||
.map(|result| result.expect("error reading from file"))
|
||||
.map(|line| serde_json::from_str(&line).expect("corrupt metadata file"))
|
||||
.collect();
|
||||
|
||||
let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum();
|
||||
let affected_pages: HashSet<_> = wal_metadata
|
||||
.iter()
|
||||
.flat_map(|m| m.affected_pages.clone())
|
||||
.collect();
|
||||
let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap();
|
||||
|
||||
Ok(Metadata {
|
||||
wal_metadata,
|
||||
total_wal_size,
|
||||
affected_pages,
|
||||
latest_lsn,
|
||||
})
|
||||
}
|
||||
|
||||
/// Print results in a format readable by benchmark_fixture.py
|
||||
fn report_latency(&self, latencies: &[Duration]) -> Result<()> {
|
||||
let mut latencies: Vec<&Duration> = latencies.iter().collect();
|
||||
latencies.sort();
|
||||
|
||||
println!("test_param num_pages {}", self.affected_pages.len());
|
||||
println!("test_param num_wal_entries {}", self.wal_metadata.len());
|
||||
println!("test_param total_wal_size {} bytes", self.total_wal_size);
|
||||
println!(
|
||||
"lower_is_better fastest {:?} microseconds",
|
||||
latencies.first().unwrap().as_micros()
|
||||
);
|
||||
println!(
|
||||
"lower_is_better median {:?} microseconds",
|
||||
latencies[latencies.len() / 2].as_micros()
|
||||
);
|
||||
println!(
|
||||
"lower_is_better average {:.2} microseconds",
|
||||
(latencies.iter().map(|l| l.as_micros()).sum::<u128>() as f64)
|
||||
/ (latencies.len() as f64)
|
||||
);
|
||||
println!(
|
||||
"lower_is_better p99 {:?} microseconds",
|
||||
latencies[latencies.len() - 1 - latencies.len() / 100].as_micros()
|
||||
);
|
||||
println!(
|
||||
"lower_is_better slowest {:?} microseconds",
|
||||
latencies.last().unwrap().as_micros()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Sequentially get the latest version of each page and report latencies
|
||||
async fn test_latest_pages(
|
||||
tenant: &ZTenantId,
|
||||
timeline: &ZTimelineId,
|
||||
metadata: &Metadata,
|
||||
) -> Result<Vec<Duration>> {
|
||||
let mut api = PagestreamApi::connect(tenant, timeline).await.unwrap();
|
||||
let mut latencies: Vec<Duration> = vec![];
|
||||
let mut page_order: Vec<&Page> = metadata.affected_pages.iter().collect();
|
||||
page_order.shuffle(&mut thread_rng());
|
||||
for page in page_order {
|
||||
let start = Instant::now();
|
||||
let _page_bytes = api.get_page(&metadata.latest_lsn, page, true).await?;
|
||||
let duration = start.elapsed();
|
||||
|
||||
latencies.push(duration);
|
||||
}
|
||||
Ok(latencies)
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
struct Args {
|
||||
// TODO maybe one metadata file per timeline?
|
||||
wal_metadata_path: PathBuf,
|
||||
|
||||
// TODO get these from wal metadata
|
||||
tenant: ZTenantId,
|
||||
timeline: ZTimelineId,
|
||||
|
||||
// TODO change to `clients_per_timeline`
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: usize,
|
||||
|
||||
#[clap(subcommand)]
|
||||
test: PsbenchTest,
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug, Clone)]
|
||||
enum PsbenchTest {
|
||||
/// Query the latest version of each page, in a random sequential order.
|
||||
/// If multiple clients are used, all clients will independently query
|
||||
/// every page in a different random order.
|
||||
GetLatestPages,
|
||||
// TODO add more tests:
|
||||
// - Query with realistic read pattern
|
||||
// - Query every page after every change of that page
|
||||
// - Query all pages at given point in time
|
||||
// - etc.
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
let metadata = Metadata::build(&args.wal_metadata_path).unwrap();
|
||||
|
||||
let latencies: Vec<Duration> = match args.test {
|
||||
PsbenchTest::GetLatestPages => {
|
||||
// TODO explicitly spawn a thread for each?
|
||||
future::join_all(
|
||||
(0..args.num_clients)
|
||||
.map(|_| test_latest_pages(&args.tenant, &args.timeline, &metadata)),
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
.flat_map(|v| v.unwrap())
|
||||
.collect()
|
||||
}
|
||||
};
|
||||
|
||||
println!("test_param num_clients {}", args.num_clients);
|
||||
metadata.report_latency(&latencies).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
@@ -2,14 +2,14 @@
|
||||
//!
|
||||
//! A handy tool for debugging, that's all.
|
||||
use anyhow::Result;
|
||||
use clap::{App, Arg};
|
||||
use clap::{Arg, Command};
|
||||
use pageserver::layered_repository::metadata::TimelineMetadata;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use utils::{lsn::Lsn, GIT_VERSION};
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith update metadata utility")
|
||||
let arg_matches = Command::new("Zenith update metadata utility")
|
||||
.about("Dump or update metadata file")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
|
||||
@@ -119,6 +119,7 @@ pub struct PageServerConf {
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
pub remote_storage_config: Option<RemoteStorageConfig>,
|
||||
|
||||
pub emit_wal_metadata: bool,
|
||||
pub profiling: ProfilingConfig,
|
||||
pub default_tenant_conf: TenantConf,
|
||||
}
|
||||
@@ -184,6 +185,7 @@ struct PageServerConfigBuilder {
|
||||
|
||||
id: BuilderValue<ZNodeId>,
|
||||
|
||||
emit_wal_metadata: BuilderValue<bool>,
|
||||
profiling: BuilderValue<ProfilingConfig>,
|
||||
}
|
||||
|
||||
@@ -209,6 +211,7 @@ impl Default for PageServerConfigBuilder {
|
||||
auth_validation_public_key_path: Set(None),
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
emit_wal_metadata: Set(false),
|
||||
profiling: Set(ProfilingConfig::Disabled),
|
||||
}
|
||||
}
|
||||
@@ -270,6 +273,10 @@ impl PageServerConfigBuilder {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
|
||||
pub fn emit_wal_metadata(&mut self, value: bool) {
|
||||
self.emit_wal_metadata = BuilderValue::Set(value)
|
||||
}
|
||||
|
||||
pub fn profiling(&mut self, profiling: ProfilingConfig) {
|
||||
self.profiling = BuilderValue::Set(profiling)
|
||||
}
|
||||
@@ -307,6 +314,9 @@ impl PageServerConfigBuilder {
|
||||
.remote_storage_config
|
||||
.ok_or(anyhow!("missing remote_storage_config"))?,
|
||||
id: self.id.ok_or(anyhow!("missing id"))?,
|
||||
emit_wal_metadata: self
|
||||
.emit_wal_metadata
|
||||
.ok_or(anyhow!("emit_wal_metadata not specifiec"))?,
|
||||
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
|
||||
// TenantConf is handled separately
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
@@ -443,6 +453,7 @@ impl PageServerConf {
|
||||
t_conf = Self::parse_toml_tenant_conf(item)?;
|
||||
}
|
||||
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
|
||||
"emit_wal_metadata" => builder.emit_wal_metadata(true),
|
||||
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
@@ -605,6 +616,7 @@ impl PageServerConf {
|
||||
auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
emit_wal_metadata: false,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::dummy_conf(),
|
||||
}
|
||||
@@ -720,6 +732,7 @@ id = 10
|
||||
auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
emit_wal_metadata: false,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
},
|
||||
@@ -759,6 +772,7 @@ id = 10
|
||||
auth_type: AuthType::Trust,
|
||||
auth_validation_public_key_path: None,
|
||||
remote_storage_config: None,
|
||||
emit_wal_metadata: false,
|
||||
profiling: ProfilingConfig::Disabled,
|
||||
default_tenant_conf: TenantConf::default(),
|
||||
},
|
||||
|
||||
@@ -17,6 +17,7 @@ pub mod tenant_threads;
|
||||
pub mod thread_mgr;
|
||||
pub mod timelines;
|
||||
pub mod virtual_file;
|
||||
pub mod wal_metadata;
|
||||
pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
pub mod walrecord;
|
||||
|
||||
101
pageserver/src/wal_metadata.rs
Normal file
101
pageserver/src/wal_metadata.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
//!
|
||||
//! Utils for logging wal metadata. Useful for tests only, and too expensive to run in prod.
|
||||
//!
|
||||
//! Ideally we'd get this wal metadata using pg_waldump from the compute pg_wal directory,
|
||||
//! but pg_waldump doesn't provide all the metadata we need. We could write a rust program
|
||||
//! to analyze pg wal, but we'd need to port some c code for decoding wal files. This module
|
||||
//! is a temporary hack that allows us to print the metadata that the pageserver decodes
|
||||
//! using postgres_ffi::waldecoder.
|
||||
//!
|
||||
//! Logging wal metadata could add significant write overhead to the pageserver. Tests that
|
||||
//! rely on this should either spin up a dedicated pageserver for wal metadata logging, or
|
||||
//! only measure read performance.
|
||||
//!
|
||||
use anyhow::Result;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use once_cell::sync::OnceCell;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{config::PageServerConf, walrecord::DecodedBkpBlock};
|
||||
|
||||
// TODO make a directory instead, and write one file per (tenant, timeline).
|
||||
pub static WAL_METADATA_FILE: OnceCell<File> = OnceCell::new();
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)]
|
||||
pub struct Page {
|
||||
spcnode: u32,
|
||||
dbnode: u32,
|
||||
relnode: u32,
|
||||
forknum: u8,
|
||||
blkno: u32,
|
||||
}
|
||||
|
||||
impl Page {
|
||||
pub async fn read<Reader>(buf: &mut Reader) -> Result<Page>
|
||||
where
|
||||
Reader: tokio::io::AsyncRead + Unpin,
|
||||
{
|
||||
let spcnode = buf.read_u32().await?;
|
||||
let dbnode = buf.read_u32().await?;
|
||||
let relnode = buf.read_u32().await?;
|
||||
let forknum = buf.read_u8().await?;
|
||||
let blkno = buf.read_u32().await?;
|
||||
Ok(Page {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode,
|
||||
forknum,
|
||||
blkno,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn write(&self, buf: &mut BytesMut) -> Result<()> {
|
||||
buf.put_u32(self.spcnode);
|
||||
buf.put_u32(self.dbnode);
|
||||
buf.put_u32(self.relnode);
|
||||
buf.put_u8(self.forknum);
|
||||
buf.put_u32(self.blkno);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&DecodedBkpBlock> for Page {
|
||||
fn from(blk: &DecodedBkpBlock) -> Self {
|
||||
Page {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum,
|
||||
blkno: blk.blkno,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct WalEntryMetadata {
|
||||
pub lsn: Lsn,
|
||||
pub size: usize,
|
||||
pub affected_pages: Vec<Page>,
|
||||
}
|
||||
|
||||
pub fn init(conf: &'static PageServerConf) -> Result<()> {
|
||||
if conf.emit_wal_metadata {
|
||||
let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log");
|
||||
WAL_METADATA_FILE
|
||||
.set(File::create(wal_metadata_file_dir)?)
|
||||
.expect("wal_metadata file is already created");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write(wal_meta: WalEntryMetadata) -> Result<()> {
|
||||
if let Some(mut file) = WAL_METADATA_FILE.get() {
|
||||
let mut line = serde_json::to_string(&wal_meta)?;
|
||||
line.push('\n');
|
||||
std::io::prelude::Write::write_all(&mut file, line.as_bytes())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -82,6 +82,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
) -> Result<()> {
|
||||
let mut modification = timeline.begin_modification(lsn);
|
||||
|
||||
let recdata_len = recdata.len();
|
||||
let mut decoded = decode_wal_record(recdata);
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
@@ -249,6 +250,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
|
||||
}
|
||||
|
||||
// Emit wal entry metadata, if configured to do so
|
||||
crate::wal_metadata::write(crate::wal_metadata::WalEntryMetadata {
|
||||
lsn,
|
||||
size: recdata_len,
|
||||
affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect(),
|
||||
})?;
|
||||
|
||||
// If checkpoint data was updated, store the new version in the repository
|
||||
if self.checkpoint_modified {
|
||||
let new_checkpoint_bytes = self.checkpoint.encode();
|
||||
|
||||
@@ -25,7 +25,7 @@ mod sasl;
|
||||
mod scram;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use clap::{App, Arg};
|
||||
use clap::{Arg, Command};
|
||||
use config::ProxyConfig;
|
||||
use futures::FutureExt;
|
||||
use std::future::Future;
|
||||
@@ -44,7 +44,7 @@ async fn flatten_err(
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
metrics::set_common_metrics_prefix("zenith_proxy");
|
||||
let arg_matches = App::new("Zenith proxy/router")
|
||||
let arg_matches = Command::new("Zenith proxy/router")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("proxy")
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::{App, Arg};
|
||||
use clap::{Arg, Command};
|
||||
use const_format::formatcp;
|
||||
use daemonize::Daemonize;
|
||||
use fs2::FileExt;
|
||||
@@ -30,7 +30,7 @@ const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
metrics::set_common_metrics_prefix("safekeeper");
|
||||
let arg_matches = App::new("Zenith safekeeper")
|
||||
let arg_matches = Command::new("Zenith safekeeper")
|
||||
.about("Store WAL stream to local file system and push it to WAL receivers")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
|
||||
@@ -232,6 +232,16 @@ class ZenithBenchmarker:
|
||||
'',
|
||||
MetricReport.TEST_PARAM)
|
||||
|
||||
def record_psbench_result(self, prefix, psbench_output):
|
||||
"""Record results from pageserver benchmarker."""
|
||||
for line in psbench_output.split("\n"):
|
||||
tokens = line.split(" ")
|
||||
report = tokens[0]
|
||||
name = tokens[1]
|
||||
value = tokens[2]
|
||||
unit = tokens[3] if len(tokens) > 3 else ""
|
||||
self.record(f"{prefix}_{name}", value, unit, report=report)
|
||||
|
||||
def get_io_writes(self, pageserver) -> int:
|
||||
"""
|
||||
Fetch the "cumulative # of bytes written" metric from the pageserver
|
||||
|
||||
@@ -1290,6 +1290,31 @@ def pg_bin(test_output_dir: str) -> PgBin:
|
||||
return PgBin(test_output_dir)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PsbenchBin:
|
||||
"""A helper class for running the pageserver benchmarker tool."""
|
||||
wal_metadata_path: str
|
||||
|
||||
def test_latest_pages(self, tenant_hex: str, timeline: str, num_clients=None) -> str:
|
||||
num_clients = num_clients or 1
|
||||
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
|
||||
args = [
|
||||
psbench_binpath,
|
||||
self.wal_metadata_path,
|
||||
tenant_hex,
|
||||
timeline,
|
||||
f"--num-clients={num_clients}",
|
||||
"get-latest-pages",
|
||||
]
|
||||
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def psbench_bin(test_output_dir):
|
||||
wal_metadata_path = os.path.join(test_output_dir, "repo", "wal_metadata.log")
|
||||
return PsbenchBin(wal_metadata_path)
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
|
||||
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||
|
||||
60
test_runner/performance/test_pageserver.py
Normal file
60
test_runner/performance/test_pageserver.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, PsbenchBin
|
||||
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
|
||||
|
||||
|
||||
# TODO split this into separate tests maybe
|
||||
@pytest.mark.parametrize("workload",
|
||||
[
|
||||
pytest.param("hot-page", marks=pytest.mark.slow),
|
||||
pytest.param("pgbench"),
|
||||
pytest.param("pgbench-big", marks=pytest.mark.slow),
|
||||
pytest.param("pgbench-long", marks=pytest.mark.slow),
|
||||
])
|
||||
def test_get_page(zenith_env_builder: ZenithEnvBuilder,
|
||||
zenbenchmark: ZenithBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
psbench_bin: PsbenchBin,
|
||||
workload: str):
|
||||
zenith_env_builder.pageserver_config_override = "emit_wal_metadata=true"
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_pageserver", DEFAULT_BRANCH_NAME)
|
||||
pg = env.postgres.create_start('test_pageserver')
|
||||
tenant_hex = env.initial_tenant.hex
|
||||
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
|
||||
|
||||
# Long-lived cursor, useful for flushing
|
||||
psconn = env.pageserver.connect()
|
||||
pscur = psconn.cursor()
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
if workload == "hot-page":
|
||||
cur.execute('create table t (i integer);')
|
||||
cur.execute('insert into t values (0);')
|
||||
for i in range(100000):
|
||||
cur.execute(f'update t set i = {i};')
|
||||
elif workload == "pgbench":
|
||||
pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()])
|
||||
pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()])
|
||||
elif workload == "pgbench-big":
|
||||
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
|
||||
pg_bin.run_capture(['pgbench', '-c1', '-t100000', pg.connstr()])
|
||||
elif workload == "pgbench-long":
|
||||
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
|
||||
pg_bin.run_capture(['pgbench', '-c1', '-t1000000', pg.connstr()])
|
||||
|
||||
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
|
||||
|
||||
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=1)
|
||||
zenbenchmark.record_psbench_result("1_client", output)
|
||||
|
||||
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=8)
|
||||
zenbenchmark.record_psbench_result("8_clients", output)
|
||||
|
||||
|
||||
# TODO test concurrent get_page requests from 8 different timelines
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{App, AppSettings, Arg, ArgMatches};
|
||||
use clap::{Arg, ArgMatches, Command};
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
@@ -125,11 +125,11 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.required(false);
|
||||
|
||||
let matches = App::new("Zenith CLI")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
let matches = Command::new("Zenith CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
.subcommand(
|
||||
App::new("init")
|
||||
Command::new("init")
|
||||
.about("Initialize a new Zenith repository")
|
||||
.arg(pageserver_config_args.clone())
|
||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||
@@ -141,12 +141,12 @@ fn main() -> Result<()> {
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("timeline")
|
||||
Command::new("timeline")
|
||||
.about("Manage timelines")
|
||||
.subcommand(App::new("list")
|
||||
.subcommand(Command::new("list")
|
||||
.about("List all timelines, available to this pageserver")
|
||||
.arg(tenant_id_arg.clone()))
|
||||
.subcommand(App::new("branch")
|
||||
.subcommand(Command::new("branch")
|
||||
.about("Create a new timeline, using another timeline as a base, copying its data")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
@@ -154,60 +154,60 @@ fn main() -> Result<()> {
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn").takes_value(true)
|
||||
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
|
||||
.subcommand(App::new("create")
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a new blank timeline")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone()))
|
||||
).subcommand(
|
||||
App::new("tenant")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
Command::new("tenant")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage tenants")
|
||||
.subcommand(App::new("list"))
|
||||
.subcommand(App::new("create")
|
||||
.subcommand(Command::new("list"))
|
||||
.subcommand(Command::new("create")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
|
||||
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
|
||||
)
|
||||
.subcommand(App::new("config")
|
||||
.subcommand(Command::new("config")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("pageserver")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
Command::new("pageserver")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage pageserver")
|
||||
.subcommand(App::new("status"))
|
||||
.subcommand(App::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(App::new("stop").about("Stop local pageserver")
|
||||
.subcommand(Command::new("status"))
|
||||
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("stop").about("Stop local pageserver")
|
||||
.arg(stop_mode_arg.clone()))
|
||||
.subcommand(App::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
|
||||
.subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
|
||||
)
|
||||
.subcommand(
|
||||
App::new("safekeeper")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
Command::new("safekeeper")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage safekeepers")
|
||||
.subcommand(App::new("start")
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local safekeeper")
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("stop")
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local safekeeper")
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("restart")
|
||||
.subcommand(Command::new("restart")
|
||||
.about("Restart local safekeeper")
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("pg")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
Command::new("pg")
|
||||
.arg_required_else_help(true)
|
||||
.about("Manage postgres instances")
|
||||
.subcommand(App::new("list").arg(tenant_id_arg.clone()))
|
||||
.subcommand(App::new("create")
|
||||
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a postgres compute node")
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
@@ -220,7 +220,7 @@ fn main() -> Result<()> {
|
||||
.long("config-only")
|
||||
.required(false)
|
||||
))
|
||||
.subcommand(App::new("start")
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
@@ -229,7 +229,7 @@ fn main() -> Result<()> {
|
||||
.arg(lsn_arg.clone())
|
||||
.arg(port_arg.clone()))
|
||||
.subcommand(
|
||||
App::new("stop")
|
||||
Command::new("stop")
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(
|
||||
@@ -242,12 +242,12 @@ fn main() -> Result<()> {
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
App::new("start")
|
||||
Command::new("start")
|
||||
.about("Start page server and safekeepers")
|
||||
.arg(pageserver_config_args)
|
||||
)
|
||||
.subcommand(
|
||||
App::new("stop")
|
||||
Command::new("stop")
|
||||
.about("Stop page server and safekeepers")
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user