Compare commits

...

19 Commits

Author SHA1 Message Date
Bojan Serafimov
88fab12d44 clippy 2022-04-27 13:23:51 -04:00
Bojan Serafimov
80dd714f01 Improve docstring 2022-04-27 13:10:41 -04:00
Bojan Serafimov
a06af3c813 Fix merge conflict mistake 2022-04-27 13:08:56 -04:00
Bojan Serafimov
be86621152 Merge branch 'main' into bojan-get-page-tests 2022-04-27 13:05:27 -04:00
Bojan Serafimov
a7870d708b Cleanup 2022-04-27 12:18:13 -04:00
Bojan Serafimov
ccb5df93ef Add multiple clients 2022-04-27 09:26:56 -04:00
Bojan Serafimov
c2adb7ac2d Merge branch 'main' into bojan-get-page-tests 2022-04-14 13:59:59 -04:00
Bojan Serafimov
7482d3df70 Clippy 2022-04-14 13:57:44 -04:00
Bojan Serafimov
735145571f Upgrade clap version 2022-04-14 13:49:34 -04:00
Bojan Serafimov
1622de3fda Clippy 2022-04-14 13:42:35 -04:00
Bojan Serafimov
382e567fc0 Fmt 2022-04-14 13:36:00 -04:00
Bojan Serafimov
d2b00b7e35 Unmark fast test as slow 2022-04-14 13:14:54 -04:00
Bojan Serafimov
b92e1763ec Cleanup 2022-04-14 13:09:44 -04:00
Bojan Serafimov
da66df21f3 Cleanup 2022-04-14 11:00:52 -04:00
Bojan Serafimov
0b53968db4 Parameterize workload 2022-04-14 10:17:21 -04:00
Bojan Serafimov
7fc488ff4b Move parsing to zenbenchmark 2022-04-14 00:51:10 -04:00
Bojan Serafimov
2ca920e4cb Add psbench_bin fixture 2022-04-14 00:40:42 -04:00
Bojan Serafimov
e74ddf4391 Add pageserver option 2022-04-13 14:35:16 -04:00
Bojan Serafimov
c2814e9828 Add get_page tests 2022-04-12 13:27:18 -04:00
17 changed files with 597 additions and 59 deletions

81
Cargo.lock generated
View File

@@ -307,17 +307,41 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.14"
version = "3.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b63edc3f163b3c71ec8aa23f9bd6070f77edbf3d1d198b164afa90ff00e4ec62"
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"os_str_bytes",
"lazy_static",
"strsim 0.10.0",
"termcolor",
"textwrap 0.14.2",
"textwrap 0.15.0",
]
[[package]]
name = "clap_derive"
version = "3.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3aab4734e083b809aaf5794e14e756d1c798d2c69c7f7de7a09a2f5214993c1"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
dependencies = [
"os_str_bytes",
]
[[package]]
@@ -336,7 +360,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap 3.0.14",
"clap 3.1.12",
"env_logger",
"hyper",
"libc",
@@ -990,6 +1014,12 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@@ -1603,9 +1633,6 @@ name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
dependencies = [
"memchr",
]
[[package]]
name = "pageserver"
@@ -1616,7 +1643,7 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clap 3.0.14",
"clap 3.1.12",
"const_format",
"crc32c",
"crossbeam-utils",
@@ -1911,6 +1938,30 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@@ -1957,7 +2008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [
"bytes",
"heck",
"heck 0.3.3",
"itertools",
"lazy_static",
"log",
@@ -2001,7 +2052,7 @@ dependencies = [
"async-trait",
"base64",
"bytes",
"clap 3.0.14",
"clap 3.1.12",
"futures",
"hashbrown",
"hex",
@@ -2431,7 +2482,7 @@ dependencies = [
"anyhow",
"byteorder",
"bytes",
"clap 3.0.14",
"clap 3.1.12",
"const_format",
"crc32c",
"daemonize",
@@ -2826,9 +2877,9 @@ dependencies = [
[[package]]
name = "textwrap"
version = "0.14.2"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
@@ -3629,7 +3680,7 @@ name = "zenith"
version = "0.1.0"
dependencies = [
"anyhow",
"clap 3.0.14",
"clap 3.1.12",
"control_plane",
"pageserver",
"postgres",

View File

@@ -163,7 +163,7 @@ fn main() -> Result<()> {
// Env variable is set by `cargo`
let version: Option<&str> = option_env!("CARGO_PKG_VERSION");
let matches = clap::App::new("zenith_ctl")
let matches = clap::Command::new("zenith_ctl")
.version(version.unwrap_or("unknown"))
.arg(
Arg::new("connstr")

View File

@@ -18,7 +18,7 @@ hex = "0.4.3"
hyper = "0.14"
itertools = "0.10.3"
lazy_static = "1.4.0"
clap = "3.0"
clap = { version = "3.1.8", features = ["derive"] }
daemonize = "0.4.1"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-util = { version = "0.7", features = ["io"] }

View File

@@ -2,7 +2,7 @@
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use clap::{Arg, Command};
use pageserver::layered_repository::dump_layerfile_from_path;
use pageserver::page_cache;
use pageserver::virtual_file;
@@ -10,7 +10,7 @@ use std::path::PathBuf;
use utils::GIT_VERSION;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith dump_layerfile utility")
let arg_matches = Command::new("Zenith dump_layerfile utility")
.about("Dump contents of one layer file, for debugging")
.version(GIT_VERSION)
.arg(

View File

@@ -5,7 +5,7 @@ use tracing::*;
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use clap::{Arg, Command};
use daemonize::Daemonize;
use pageserver::{
@@ -35,7 +35,7 @@ fn version() -> String {
fn main() -> anyhow::Result<()> {
metrics::set_common_metrics_prefix("pageserver");
let arg_matches = App::new("Zenith page server")
let arg_matches = Command::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.version(&*version())
.arg(
@@ -189,6 +189,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
// Initialize wal metadata logger, if necessary
pageserver::wal_metadata::init(conf).expect("wal_metadata init failed");
info!("version: {}", GIT_VERSION);
// TODO: Check that it looks like a valid repository before going further

View File

@@ -0,0 +1,265 @@
//! Pageserver benchmark tool
//!
//! This tool connects directly to a pageserver, issues queries and measures performance.
//!
//! Ideally the tool would be ablle to stream WAL into the pageserver, and (possibly
//! simultaneously) make read requests. Currently wal streaming is not implemented,
//! so this tool assumes the pageserver is prepopulated with some data, and only
//! issues read queries. It also currently assumes that the pageserver writes out some
//! metadata describing the write access pattern on the workload that was performed on it.
//! See the python tests that use psbench for usage example.
//!
//! This tool runs a variety of workloads. See the PsbenchTest enum below, or run the tool
//! with --help to see the available workloads.
//!
use bytes::{BufMut, BytesMut};
use clap::{Parser, Subcommand};
use futures::future;
use pageserver::wal_metadata::{Page, WalEntryMetadata};
use postgres_ffi::pg_constants::BLCKSZ;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::Instant;
use std::{
collections::HashSet,
io::{BufRead, BufReader, Cursor},
time::Duration,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use utils::zid::{ZTenantId, ZTimelineId};
use utils::{
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
use anyhow::Result;
/// Client for the pageserver's pagestream API
struct PagestreamApi {
stream: TcpStream,
}
/// Good enough implementation for these tests
impl PagestreamApi {
async fn connect(tenant: &ZTenantId, timeline: &ZTimelineId) -> Result<PagestreamApi> {
let mut stream = TcpStream::connect("localhost:15000").await?;
// Connect to pageserver
// TODO read host, port, dbname, user from command line
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut stream, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant, timeline);
tokio::select! {
_ = conn => panic!("connection closed during pagestream initialization"),
_ = client.query(init_query.as_str(), &[]) => (),
};
Ok(PagestreamApi { stream })
}
async fn get_page(&mut self, lsn: &Lsn, page: &Page, latest: bool) -> anyhow::Result<Vec<u8>> {
let latest: u8 = if latest { 1 } else { 0 };
let msg = {
let query = {
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(latest);
query.put_u64(lsn.0);
page.write(&mut query).await?;
query.freeze()
};
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&query);
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
self.stream.write_all(&msg).await?;
let response = match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
let page = {
let mut cursor = Cursor::new(response);
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
match tag {
102 => {
let mut page = Vec::<u8>::new();
cursor.read_to_end(&mut page).await?;
if page.len() != (BLCKSZ as usize) {
panic!("Expected 8kb page, got: {:?}", page.len());
}
page
}
103 => {
let mut bytes = Vec::<u8>::new();
cursor.read_to_end(&mut bytes).await?;
let message = String::from_utf8(bytes)?;
panic!("Got error message: {}", message);
}
_ => panic!("Unhandled tag {:?}", tag),
}
};
Ok(page)
}
}
/// Parsed wal_metadata file with additional derived
/// statistics for convenience.
#[derive(Clone)]
struct Metadata {
// Parsed from metadata file
wal_metadata: Vec<WalEntryMetadata>,
// Derived from wal_metadata
total_wal_size: usize,
affected_pages: HashSet<Page>,
latest_lsn: Lsn,
}
impl Metadata {
/// Construct metadata object from wal_metadata file emitted by pageserver
fn build(wal_metadata_path: &Path) -> Result<Metadata> {
let wal_metadata_file = File::open(wal_metadata_path).expect("error opening wal_metadata");
let wal_metadata: Vec<WalEntryMetadata> = BufReader::new(wal_metadata_file)
.lines()
.map(|result| result.expect("error reading from file"))
.map(|line| serde_json::from_str(&line).expect("corrupt metadata file"))
.collect();
let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum();
let affected_pages: HashSet<_> = wal_metadata
.iter()
.flat_map(|m| m.affected_pages.clone())
.collect();
let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap();
Ok(Metadata {
wal_metadata,
total_wal_size,
affected_pages,
latest_lsn,
})
}
/// Print results in a format readable by benchmark_fixture.py
fn report_latency(&self, latencies: &[Duration]) -> Result<()> {
let mut latencies: Vec<&Duration> = latencies.iter().collect();
latencies.sort();
println!("test_param num_pages {}", self.affected_pages.len());
println!("test_param num_wal_entries {}", self.wal_metadata.len());
println!("test_param total_wal_size {} bytes", self.total_wal_size);
println!(
"lower_is_better fastest {:?} microseconds",
latencies.first().unwrap().as_micros()
);
println!(
"lower_is_better median {:?} microseconds",
latencies[latencies.len() / 2].as_micros()
);
println!(
"lower_is_better average {:.2} microseconds",
(latencies.iter().map(|l| l.as_micros()).sum::<u128>() as f64)
/ (latencies.len() as f64)
);
println!(
"lower_is_better p99 {:?} microseconds",
latencies[latencies.len() - 1 - latencies.len() / 100].as_micros()
);
println!(
"lower_is_better slowest {:?} microseconds",
latencies.last().unwrap().as_micros()
);
Ok(())
}
}
/// Sequentially get the latest version of each page and report latencies
async fn test_latest_pages(
tenant: &ZTenantId,
timeline: &ZTimelineId,
metadata: &Metadata,
) -> Result<Vec<Duration>> {
let mut api = PagestreamApi::connect(tenant, timeline).await.unwrap();
let mut latencies: Vec<Duration> = vec![];
let mut page_order: Vec<&Page> = metadata.affected_pages.iter().collect();
page_order.shuffle(&mut thread_rng());
for page in page_order {
let start = Instant::now();
let _page_bytes = api.get_page(&metadata.latest_lsn, page, true).await?;
let duration = start.elapsed();
latencies.push(duration);
}
Ok(latencies)
}
#[derive(Parser, Debug, Clone)]
struct Args {
// TODO maybe one metadata file per timeline?
wal_metadata_path: PathBuf,
// TODO get these from wal metadata
tenant: ZTenantId,
timeline: ZTimelineId,
// TODO change to `clients_per_timeline`
#[clap(long, default_value = "1")]
num_clients: usize,
#[clap(subcommand)]
test: PsbenchTest,
}
#[derive(Subcommand, Debug, Clone)]
enum PsbenchTest {
/// Query the latest version of each page, in a random sequential order.
/// If multiple clients are used, all clients will independently query
/// every page in a different random order.
GetLatestPages,
// TODO add more tests:
// - Query with realistic read pattern
// - Query every page after every change of that page
// - Query all pages at given point in time
// - etc.
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let metadata = Metadata::build(&args.wal_metadata_path).unwrap();
let latencies: Vec<Duration> = match args.test {
PsbenchTest::GetLatestPages => {
// TODO explicitly spawn a thread for each?
future::join_all(
(0..args.num_clients)
.map(|_| test_latest_pages(&args.tenant, &args.timeline, &metadata)),
)
.await
.into_iter()
.flat_map(|v| v.unwrap())
.collect()
}
};
println!("test_param num_clients {}", args.num_clients);
metadata.report_latency(&latencies).unwrap();
Ok(())
}

View File

@@ -2,14 +2,14 @@
//!
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use clap::{Arg, Command};
use pageserver::layered_repository::metadata::TimelineMetadata;
use std::path::PathBuf;
use std::str::FromStr;
use utils::{lsn::Lsn, GIT_VERSION};
fn main() -> Result<()> {
let arg_matches = App::new("Zenith update metadata utility")
let arg_matches = Command::new("Zenith update metadata utility")
.about("Dump or update metadata file")
.version(GIT_VERSION)
.arg(

View File

@@ -119,6 +119,7 @@ pub struct PageServerConf {
pub auth_validation_public_key_path: Option<PathBuf>,
pub remote_storage_config: Option<RemoteStorageConfig>,
pub emit_wal_metadata: bool,
pub profiling: ProfilingConfig,
pub default_tenant_conf: TenantConf,
}
@@ -184,6 +185,7 @@ struct PageServerConfigBuilder {
id: BuilderValue<ZNodeId>,
emit_wal_metadata: BuilderValue<bool>,
profiling: BuilderValue<ProfilingConfig>,
}
@@ -209,6 +211,7 @@ impl Default for PageServerConfigBuilder {
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
id: NotSet,
emit_wal_metadata: Set(false),
profiling: Set(ProfilingConfig::Disabled),
}
}
@@ -270,6 +273,10 @@ impl PageServerConfigBuilder {
self.id = BuilderValue::Set(node_id)
}
pub fn emit_wal_metadata(&mut self, value: bool) {
self.emit_wal_metadata = BuilderValue::Set(value)
}
pub fn profiling(&mut self, profiling: ProfilingConfig) {
self.profiling = BuilderValue::Set(profiling)
}
@@ -307,6 +314,9 @@ impl PageServerConfigBuilder {
.remote_storage_config
.ok_or(anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow!("missing id"))?,
emit_wal_metadata: self
.emit_wal_metadata
.ok_or(anyhow!("emit_wal_metadata not specifiec"))?,
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
@@ -443,6 +453,7 @@ impl PageServerConf {
t_conf = Self::parse_toml_tenant_conf(item)?;
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"emit_wal_metadata" => builder.emit_wal_metadata(true),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
@@ -605,6 +616,7 @@ impl PageServerConf {
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
remote_storage_config: None,
emit_wal_metadata: false,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
}
@@ -720,6 +732,7 @@ id = 10
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
remote_storage_config: None,
emit_wal_metadata: false,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
},
@@ -759,6 +772,7 @@ id = 10
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
remote_storage_config: None,
emit_wal_metadata: false,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
},

View File

@@ -17,6 +17,7 @@ pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;
pub mod wal_metadata;
pub mod walingest;
pub mod walreceiver;
pub mod walrecord;

View File

@@ -0,0 +1,101 @@
//!
//! Utils for logging wal metadata. Useful for tests only, and too expensive to run in prod.
//!
//! Ideally we'd get this wal metadata using pg_waldump from the compute pg_wal directory,
//! but pg_waldump doesn't provide all the metadata we need. We could write a rust program
//! to analyze pg wal, but we'd need to port some c code for decoding wal files. This module
//! is a temporary hack that allows us to print the metadata that the pageserver decodes
//! using postgres_ffi::waldecoder.
//!
//! Logging wal metadata could add significant write overhead to the pageserver. Tests that
//! rely on this should either spin up a dedicated pageserver for wal metadata logging, or
//! only measure read performance.
//!
use anyhow::Result;
use bytes::{BufMut, BytesMut};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::fs::File;
use tokio::io::AsyncReadExt;
use utils::lsn::Lsn;
use crate::{config::PageServerConf, walrecord::DecodedBkpBlock};
// TODO make a directory instead, and write one file per (tenant, timeline).
pub static WAL_METADATA_FILE: OnceCell<File> = OnceCell::new();
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
pub async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page {
spcnode,
dbnode,
relnode,
forknum,
blkno,
})
}
pub async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
impl From<&DecodedBkpBlock> for Page {
fn from(blk: &DecodedBkpBlock) -> Self {
Page {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
blkno: blk.blkno,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalEntryMetadata {
pub lsn: Lsn,
pub size: usize,
pub affected_pages: Vec<Page>,
}
pub fn init(conf: &'static PageServerConf) -> Result<()> {
if conf.emit_wal_metadata {
let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log");
WAL_METADATA_FILE
.set(File::create(wal_metadata_file_dir)?)
.expect("wal_metadata file is already created");
}
Ok(())
}
pub fn write(wal_meta: WalEntryMetadata) -> Result<()> {
if let Some(mut file) = WAL_METADATA_FILE.get() {
let mut line = serde_json::to_string(&wal_meta)?;
line.push('\n');
std::io::prelude::Write::write_all(&mut file, line.as_bytes())?;
}
Ok(())
}

View File

@@ -82,6 +82,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
let recdata_len = recdata.len();
let mut decoded = decode_wal_record(recdata);
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -249,6 +250,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
}
// Emit wal entry metadata, if configured to do so
crate::wal_metadata::write(crate::wal_metadata::WalEntryMetadata {
lsn,
size: recdata_len,
affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect(),
})?;
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();

View File

@@ -25,7 +25,7 @@ mod sasl;
mod scram;
use anyhow::{bail, Context};
use clap::{App, Arg};
use clap::{Arg, Command};
use config::ProxyConfig;
use futures::FutureExt;
use std::future::Future;
@@ -44,7 +44,7 @@ async fn flatten_err(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
metrics::set_common_metrics_prefix("zenith_proxy");
let arg_matches = App::new("Zenith proxy/router")
let arg_matches = Command::new("Zenith proxy/router")
.version(GIT_VERSION)
.arg(
Arg::new("proxy")

View File

@@ -2,7 +2,7 @@
// Main entry point for the safekeeper executable
//
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use clap::{Arg, Command};
use const_format::formatcp;
use daemonize::Daemonize;
use fs2::FileExt;
@@ -30,7 +30,7 @@ const ID_FILE_NAME: &str = "safekeeper.id";
fn main() -> Result<()> {
metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith safekeeper")
let arg_matches = Command::new("Zenith safekeeper")
.about("Store WAL stream to local file system and push it to WAL receivers")
.version(GIT_VERSION)
.arg(

View File

@@ -232,6 +232,16 @@ class ZenithBenchmarker:
'',
MetricReport.TEST_PARAM)
def record_psbench_result(self, prefix, psbench_output):
"""Record results from pageserver benchmarker."""
for line in psbench_output.split("\n"):
tokens = line.split(" ")
report = tokens[0]
name = tokens[1]
value = tokens[2]
unit = tokens[3] if len(tokens) > 3 else ""
self.record(f"{prefix}_{name}", value, unit, report=report)
def get_io_writes(self, pageserver) -> int:
"""
Fetch the "cumulative # of bytes written" metric from the pageserver

View File

@@ -1290,6 +1290,31 @@ def pg_bin(test_output_dir: str) -> PgBin:
return PgBin(test_output_dir)
@dataclass
class PsbenchBin:
"""A helper class for running the pageserver benchmarker tool."""
wal_metadata_path: str
def test_latest_pages(self, tenant_hex: str, timeline: str, num_clients=None) -> str:
num_clients = num_clients or 1
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
args = [
psbench_binpath,
self.wal_metadata_path,
tenant_hex,
timeline,
f"--num-clients={num_clients}",
"get-latest-pages",
]
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
@pytest.fixture(scope='function')
def psbench_bin(test_output_dir):
wal_metadata_path = os.path.join(test_output_dir, "repo", "wal_metadata.log")
return PsbenchBin(wal_metadata_path)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
super().__init__(host='localhost', port=port, dbname='postgres')

View File

@@ -0,0 +1,60 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import ZenithEnv, PgBin, ZenithEnvBuilder, DEFAULT_BRANCH_NAME, PsbenchBin
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
# TODO split this into separate tests maybe
@pytest.mark.parametrize("workload",
[
pytest.param("hot-page", marks=pytest.mark.slow),
pytest.param("pgbench"),
pytest.param("pgbench-big", marks=pytest.mark.slow),
pytest.param("pgbench-long", marks=pytest.mark.slow),
])
def test_get_page(zenith_env_builder: ZenithEnvBuilder,
zenbenchmark: ZenithBenchmarker,
pg_bin: PgBin,
psbench_bin: PsbenchBin,
workload: str):
zenith_env_builder.pageserver_config_override = "emit_wal_metadata=true"
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_pageserver", DEFAULT_BRANCH_NAME)
pg = env.postgres.create_start('test_pageserver')
tenant_hex = env.initial_tenant.hex
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
# Long-lived cursor, useful for flushing
psconn = env.pageserver.connect()
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
if workload == "hot-page":
cur.execute('create table t (i integer);')
cur.execute('insert into t values (0);')
for i in range(100000):
cur.execute(f'update t set i = {i};')
elif workload == "pgbench":
pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()])
elif workload == "pgbench-big":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t100000', pg.connstr()])
elif workload == "pgbench-long":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t1000000', pg.connstr()])
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=1)
zenbenchmark.record_psbench_result("1_client", output)
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=8)
zenbenchmark.record_psbench_result("8_clients", output)
# TODO test concurrent get_page requests from 8 different timelines

View File

@@ -1,5 +1,5 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{App, AppSettings, Arg, ArgMatches};
use clap::{Arg, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::LocalEnv;
@@ -125,11 +125,11 @@ fn main() -> Result<()> {
.takes_value(true)
.required(false);
let matches = App::new("Zenith CLI")
.setting(AppSettings::ArgRequiredElseHelp)
let matches = Command::new("Zenith CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
.subcommand(
App::new("init")
Command::new("init")
.about("Initialize a new Zenith repository")
.arg(pageserver_config_args.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
@@ -141,12 +141,12 @@ fn main() -> Result<()> {
)
)
.subcommand(
App::new("timeline")
Command::new("timeline")
.about("Manage timelines")
.subcommand(App::new("list")
.subcommand(Command::new("list")
.about("List all timelines, available to this pageserver")
.arg(tenant_id_arg.clone()))
.subcommand(App::new("branch")
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone())
@@ -154,60 +154,60 @@ fn main() -> Result<()> {
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn").takes_value(true)
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
.subcommand(App::new("create")
.subcommand(Command::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone()))
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
Command::new("tenant")
.arg_required_else_help(true)
.about("Manage tenants")
.subcommand(App::new("list"))
.subcommand(App::new("create")
.subcommand(Command::new("list"))
.subcommand(Command::new("create")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
.subcommand(App::new("config")
.subcommand(Command::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
)
.subcommand(
App::new("pageserver")
.setting(AppSettings::ArgRequiredElseHelp)
Command::new("pageserver")
.arg_required_else_help(true)
.about("Manage pageserver")
.subcommand(App::new("status"))
.subcommand(App::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(App::new("stop").about("Stop local pageserver")
.subcommand(Command::new("status"))
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(stop_mode_arg.clone()))
.subcommand(App::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
)
.subcommand(
App::new("safekeeper")
.setting(AppSettings::ArgRequiredElseHelp)
Command::new("safekeeper")
.arg_required_else_help(true)
.about("Manage safekeepers")
.subcommand(App::new("start")
.subcommand(Command::new("start")
.about("Start local safekeeper")
.arg(safekeeper_id_arg.clone())
)
.subcommand(App::new("stop")
.subcommand(Command::new("stop")
.about("Stop local safekeeper")
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
.subcommand(App::new("restart")
.subcommand(Command::new("restart")
.about("Restart local safekeeper")
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
)
.subcommand(
App::new("pg")
.setting(AppSettings::ArgRequiredElseHelp)
Command::new("pg")
.arg_required_else_help(true)
.about("Manage postgres instances")
.subcommand(App::new("list").arg(tenant_id_arg.clone()))
.subcommand(App::new("create")
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
.subcommand(Command::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.arg(branch_name_arg.clone())
@@ -220,7 +220,7 @@ fn main() -> Result<()> {
.long("config-only")
.required(false)
))
.subcommand(App::new("start")
.subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.arg(tenant_id_arg.clone())
@@ -229,7 +229,7 @@ fn main() -> Result<()> {
.arg(lsn_arg.clone())
.arg(port_arg.clone()))
.subcommand(
App::new("stop")
Command::new("stop")
.arg(pg_node_arg.clone())
.arg(tenant_id_arg.clone())
.arg(
@@ -242,12 +242,12 @@ fn main() -> Result<()> {
)
.subcommand(
App::new("start")
Command::new("start")
.about("Start page server and safekeepers")
.arg(pageserver_config_args)
)
.subcommand(
App::new("stop")
Command::new("stop")
.about("Stop page server and safekeepers")
.arg(stop_mode_arg.clone())
)