diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 450b93d85a..c5a601b7ce 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -4,7 +4,7 @@ on: [push] jobs: regression-check: - timeout-minutes: 10 + timeout-minutes: 30 name: run regression test suite runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 29889a4583..6d668d58b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,19 +91,19 @@ dependencies = [ [[package]] name = "async-io" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd" +checksum = "fcb9af4888a70ad78ecb5efcb0ba95d66a3cf54a88b62ae81559954c7588c7a2" dependencies = [ "concurrent-queue", "fastrand", "futures-lite", "libc", "log", - "nb-connect", "once_cell", "parking", "polling", + "socket2", "vec-arena", "waker-fn", "winapi", @@ -111,9 +111,9 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb" +checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b" dependencies = [ "event-listener", ] @@ -162,9 +162,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-trait" -version = "0.1.49" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589652ce7ccb335d1e7ecb3be145425702b290dbcb7029bbeaae263fc1d87b48" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" dependencies = [ "proc-macro2", "quote", @@ -243,13 +243,12 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bindgen" -version = "0.53.3" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5" +checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" dependencies = [ "bitflags", "cexpr", - "cfg-if 0.1.10", "clang-sys", "clap", "env_logger", @@ -346,6 +345,9 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -383,9 +385,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "0.29.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" +checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c" dependencies = [ "glob", "libc", @@ -596,9 +598,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.7.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" dependencies = [ "atty", "humantime", @@ -922,9 +924,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.3.6" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc35c995b9d93ec174cf9a27d425c7892722101e14993cd227fdb51d70cf9589" +checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437" [[package]] name = "httpdate" @@ -934,12 +936,9 @@ checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" [[package]] name = "humantime" -version = "1.3.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" -dependencies = [ - "quick-error", -] +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" @@ -980,9 +979,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" dependencies = [ "matches", "unicode-bidi", @@ -1033,6 +1032,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.50" @@ -1071,14 +1079,26 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" [[package]] name = "libloading" -version = "0.5.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" +checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a" dependencies = [ - "cc", + "cfg-if 1.0.0", "winapi", ] +[[package]] +name = "librocksdb-sys" +version = "6.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9" +dependencies = [ + "bindgen", + "cc", + "glob", + "libc", +] + [[package]] name = "lock_api" version = "0.4.3" @@ -1184,16 +1204,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nb-connect" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d" -dependencies = [ - "libc", - "socket2", -] - [[package]] name = "nom" version = "5.1.2" @@ -1213,6 +1223,41 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1223,6 +1268,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.14" @@ -1319,12 +1387,14 @@ dependencies = [ "hex", "lazy_static", "log", + "parse_duration", "postgres", "postgres-protocol", "postgres-types", "postgres_ffi", "rand 0.8.3", "regex", + "rocksdb", "rust-s3", "slog", "slog-async", @@ -1373,6 +1443,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "parse_duration" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d" +dependencies = [ + "lazy_static", + "num", + "regex", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -1405,18 +1486,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc174859768806e91ae575187ada95c91a29e96a98dc5d2cd9a1fed039501ba6" +checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a490329918e856ed1b083f244e3bfe2d8c4f336407e4ea9e1a9f479ff09049e5" +checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f" dependencies = [ "proc-macro2", "quote", @@ -1536,12 +1617,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - [[package]] name = "quote" version = "1.0.9" @@ -1738,6 +1813,16 @@ dependencies = [ "winreg", ] +[[package]] +name = "rocksdb" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -1975,9 +2060,9 @@ checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27" [[package]] name = "slab" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" [[package]] name = "slog" @@ -2415,9 +2500,9 @@ dependencies = [ [[package]] name = "vcpkg" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb" +checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d" [[package]] name = "vec-arena" diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index e63d72ada1..fe2204b539 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -3,6 +3,7 @@ use std::io::{Read, Write}; use std::net::SocketAddr; use std::net::TcpStream; use std::os::unix::fs::PermissionsExt; +use std::path::Path; use std::process::Command; use std::sync::Arc; use std::time::Duration; @@ -16,7 +17,7 @@ use postgres::{Client, NoTls}; use crate::local_env::LocalEnv; use crate::storage::{PageServerNode, WalProposerNode}; -use pageserver::ZTimelineId; +use pageserver::{zenith_repo_dir, ZTimelineId}; // // ComputeControlPlane @@ -276,7 +277,9 @@ impl PostgresNode { max_replication_slots = 10\n\ hot_standby = on\n\ shared_buffers = 1MB\n\ + fsync = off\n\ max_connections = 100\n\ + wal_sender_timeout = 0\n\ wal_level = replica\n\ listen_addresses = '{address}'\n\ port = {port}\n", @@ -443,8 +446,71 @@ impl PostgresNode { } } - // TODO - pub fn pg_bench() {} + pub fn pg_regress(&self) { + self.safe_psql("postgres", "CREATE DATABASE regression"); + let data_dir = zenith_repo_dir(); + let regress_run_path = data_dir.join("regress"); + fs::create_dir_all(®ress_run_path).unwrap(); + fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); + std::env::set_current_dir(regress_run_path).unwrap(); + + let regress_build_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); + let regress_src_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); + + let _regress_check = Command::new(regress_build_path.join("pg_regress")) + .args(&[ + "--bindir=''", + "--use-existing", + format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(), + format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), + format!( + "--schedule={}", + regress_src_path.join("parallel_schedule").to_str().unwrap() + ) + .as_str(), + format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), + ]) + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGPORT", self.address.port().to_string()) + .env("PGUSER", self.whoami()) + .env("PGHOST", self.address.ip().to_string()) + .status() + .expect("pg_regress failed"); + } + + pub fn pg_bench(&self, clients: u32, seconds: u32) { + let port = self.address.port().to_string(); + let clients = clients.to_string(); + let seconds = seconds.to_string(); + let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&["-i", "-p", port.as_str(), "postgres"]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench -i"); + let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&[ + "-p", + port.as_str(), + "-T", + seconds.as_str(), + "-P", + "1", + "-c", + clients.as_str(), + "-M", + "prepared", + "postgres", + ]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench run"); + } } impl Drop for PostgresNode { diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 8138364353..d26150fc64 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -15,6 +15,7 @@ use std::process::{Command, Stdio}; use anyhow::Result; use serde_derive::{Deserialize, Serialize}; +use pageserver::zenith_repo_dir; use pageserver::ZTimelineId; use postgres_ffi::xlog_utils; @@ -52,14 +53,6 @@ impl LocalEnv { } } -fn zenith_repo_dir() -> PathBuf { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => PathBuf::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} - // // Initialize a new Zenith repository // @@ -145,7 +138,10 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { .arg("--no-instructions") .env_clear() .env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) + .env( + "DYLD_LIBRARY_PATH", + local_env.pg_lib_dir().to_str().unwrap(), + ) .stdout(Stdio::null()) .status() .with_context(|| "failed to execute initdb")?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index bf7e689be8..635e486e4e 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -13,7 +13,6 @@ use std::time::Duration; use postgres::{Client, NoTls}; -use crate::compute::PostgresNode; use crate::local_env::LocalEnv; use pageserver::ZTimelineId; @@ -111,6 +110,9 @@ impl TestStorageControlPlane { } pub fn stop(&self) { + for wa in self.wal_acceptors.iter() { + let _ = wa.stop(); + } self.test_done.store(true, Ordering::Relaxed); } @@ -366,43 +368,6 @@ impl Drop for WalProposerNode { } } -/////////////////////////////////////////////////////////////////////////////// - -pub fn regress_check(pg: &PostgresNode) { - pg.safe_psql("postgres", "CREATE DATABASE regression"); - - let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress"); - fs::create_dir_all(regress_run_path.clone()).unwrap(); - std::env::set_current_dir(regress_run_path).unwrap(); - - let regress_build_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); - let regress_src_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); - - let _regress_check = Command::new(regress_build_path.join("pg_regress")) - .args(&[ - "--bindir=''", - "--use-existing", - format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(), - format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), - format!( - "--schedule={}", - regress_src_path.join("parallel_schedule").to_str().unwrap() - ) - .as_str(), - format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), - ]) - .env_clear() - .env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap()) - .env("PGHOST", pg.address.ip().to_string()) - .env("PGPORT", pg.address.port().to_string()) - .env("PGUSER", pg.whoami()) - .status() - .expect("pg_regress failed"); -} - /// Read a PID file /// /// This should contain an unsigned integer, but we return it as a String diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 57ad0c186f..79d8593315 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -51,7 +51,6 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] -#[ignore] fn test_regress() { let local_env = local_env::test_env("test_regress"); @@ -64,7 +63,24 @@ fn test_regress() { let node = compute_cplane.new_test_node(maintli); node.start().unwrap(); - control_plane::storage::regress_check(&node); + node.pg_regress(); +} + +// Runs pg_bench on a compute node +#[test] +fn pgbench() { + let local_env = local_env::test_env("pgbench"); + + // Start pageserver that reads WAL directly from that postgres + let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); + let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); + + // start postgres + let maintli = storage_cplane.get_branch_timeline("main"); + let node = compute_cplane.new_test_node(maintli); + node.start().unwrap(); + + node.pg_bench(10, 100); } // Run two postgres instances on one pageserver, on different timelines diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index b161be36fe..a198f6403a 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -32,12 +32,14 @@ tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +rocksdb = "0.16.0" anyhow = "1.0" crc32c = "0.6.0" walkdir = "2" thiserror = "1.0" hex = "0.4.3" tar = "0.4.33" +parse_duration = "*" postgres_ffi = { path = "../postgres_ffi" } zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 495ec41b65..23e0d5e57b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -3,12 +3,13 @@ // use log::*; -use std::fs; -use std::fs::{File, OpenOptions}; +use parse_duration::parse; +use std::fs::{self, OpenOptions}; use std::io; use std::path::PathBuf; use std::process::exit; use std::thread; +use std::time::Duration; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -16,18 +17,10 @@ use daemonize::Daemonize; use slog::Drain; -use pageserver::page_service; -use pageserver::tui; -//use pageserver::walreceiver; -use pageserver::PageServerConf; +use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf}; -fn zenith_repo_dir() -> String { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => String::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} +const DEFAULT_GC_HORIZON: u64 = 0; //64 * 1024 * 1024; +const DEFAULT_GC_PERIOD_SEC: u64 = 1; fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") @@ -53,11 +46,25 @@ fn main() -> Result<()> { .takes_value(false) .help("Run in the background"), ) + .arg( + Arg::with_name("gc_horizon") + .long("gc_horizon") + .takes_value(true) + .help("Distance from current LSN to perform all wal records cleanup"), + ) + .arg( + Arg::with_name("gc_period") + .long("gc_period") + .takes_value(true) + .help("Interval between garbage collector iterations"), + ) .get_matches(); let mut conf = PageServerConf { daemonize: false, interactive: false, + gc_horizon: DEFAULT_GC_HORIZON, + gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC), listen_addr: "127.0.0.1:5430".parse().unwrap(), }; @@ -78,6 +85,14 @@ fn main() -> Result<()> { conf.listen_addr = addr.parse()?; } + if let Some(horizon) = arg_matches.value_of("gc_horizon") { + conf.gc_horizon = horizon.parse()?; + } + + if let Some(period) = arg_matches.value_of("gc_period") { + conf.gc_period = parse(period)?; + } + start_pageserver(&conf) } @@ -139,7 +154,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { // does this for us. let repodir = zenith_repo_dir(); std::env::set_current_dir(&repodir)?; - info!("Changed current directory to repository in {}", &repodir); + info!("Changed current directory to repository in {:?}", &repodir); } let mut threads = Vec::new(); @@ -185,12 +200,16 @@ fn init_logging(conf: &PageServerConf) -> Result PathBuf { + // Find repository path + match std::env::var_os("ZENITH_REPO_DIR") { + Some(val) => PathBuf::from(val.to_str().unwrap()), + None => ".zenith".into(), + } +} diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8b9692aaff..21f794913d 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -8,16 +8,16 @@ use crate::restore_local_repo::restore_timeline; use crate::ZTimelineId; -use crate::{walredo, PageServerConf}; +use crate::{walredo, zenith_repo_dir, PageServerConf}; use anyhow::{bail, Context}; -use bytes::Bytes; -use core::ops::Bound::Included; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use crossbeam_channel::unbounded; use crossbeam_channel::{Receiver, Sender}; use lazy_static::lazy_static; use log::*; -use rand::Rng; -use std::collections::{BTreeMap, HashMap}; +use rocksdb; +use std::cmp::min; +use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::{Arc, Condvar, Mutex}; @@ -32,6 +32,9 @@ static TIMEOUT: Duration = Duration::from_secs(60); pub struct PageCache { shared: Mutex, + // RocksDB handle + db: rocksdb::DB, + // Channel for communicating with the WAL redo process here. pub walredo_sender: Sender>, pub walredo_receiver: Receiver>, @@ -82,16 +85,6 @@ impl AddAssign for PageCacheStats { // Shared data structure, holding page cache and related auxiliary information // struct PageCacheShared { - // The actual page cache - pagecache: BTreeMap>, - - // Relation n_blocks cache - // - // This hashtable should be updated together with the pagecache. Now it is - // accessed unreasonably often through the smgr_nblocks(). It is better to just - // cache it in postgres smgr and ask only on restart. - relsize_cache: HashMap, - // What page versions do we hold in the cache? If we get GetPage with // LSN < first_valid_lsn, that's an error because we (no longer) hold that // page version. If we get a request > last_valid_lsn, we need to wait until @@ -135,7 +128,7 @@ pub fn get_or_restore_pagecache( match pcaches.get(&timelineid) { Some(pcache) => Ok(pcache.clone()), None => { - let pcache = init_page_cache(); + let pcache = init_page_cache(conf, timelineid); restore_timeline(conf, &pcache, timelineid)?; @@ -155,20 +148,42 @@ pub fn get_or_restore_pagecache( walredo::wal_redo_main(&conf_copy, timelineid); }) .unwrap(); - + if conf.gc_horizon != 0 { + let conf_copy = conf.clone(); + let _gc_thread = thread::Builder::new() + .name("Garbage collection thread".into()) + .spawn(move || { + gc_thread_main(&conf_copy, timelineid); + }) + .unwrap(); + } Ok(result) } } } -fn init_page_cache() -> PageCache { +fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { + info!("Garbage collection thread started {}", timelineid); + let pcache = get_pagecache(conf, timelineid).unwrap(); + pcache.do_gc(conf).unwrap(); +} + +fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { + let path = zenith_repo_dir().join(timelineid.to_string()); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_use_fsync(true); + opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + rocksdb::DB::open(&opts, &path).unwrap() +} + +fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { // Initialize the channel between the page cache and the WAL applicator let (s, r) = unbounded(); PageCache { + db: open_rocksdb(&conf, timelineid), shared: Mutex::new(PageCacheShared { - pagecache: BTreeMap::new(), - relsize_cache: HashMap::new(), first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, @@ -209,6 +224,19 @@ pub struct CacheKey { pub lsn: u64, } +impl CacheKey { + pub fn pack(&self, buf: &mut BytesMut) { + self.tag.pack(buf); + buf.put_u64(self.lsn); + } + pub fn unpack(buf: &mut BytesMut) -> CacheKey { + CacheKey { + tag: BufferTag::unpack(buf), + lsn: buf.get_u64(), + } + } +} + pub struct CacheEntry { pub key: CacheKey, @@ -228,21 +256,47 @@ pub struct CacheEntryContent { pub apply_pending: bool, } -impl CacheEntry { - fn new(key: CacheKey) -> CacheEntry { - CacheEntry { - key, - content: Mutex::new(CacheEntryContent { - page_image: None, +impl CacheEntryContent { + pub fn pack(&self, buf: &mut BytesMut) { + if let Some(image) = &self.page_image { + buf.put_u8(1); + buf.put_u16(image.len() as u16); + buf.put_slice(&image[..]); + } else if let Some(rec) = &self.wal_record { + buf.put_u8(0); + rec.pack(buf); + } + } + pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { + if buf.get_u8() == 1 { + let mut dst = vec![0u8; buf.get_u16() as usize]; + buf.copy_to_slice(&mut dst); + CacheEntryContent { + page_image: Some(Bytes::from(dst)), wal_record: None, apply_pending: false, - }), + } + } else { + CacheEntryContent { + page_image: None, + wal_record: Some(WALRecord::unpack(buf)), + apply_pending: false, + } + } + } +} + +impl CacheEntry { + fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { + CacheEntry { + key, + content: Mutex::new(content), walredo_condvar: Condvar::new(), } } } -#[derive(Eq, PartialEq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] pub struct RelTag { pub spcnode: u32, pub dbnode: u32, @@ -250,42 +304,195 @@ pub struct RelTag { pub forknum: u8, } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] +impl RelTag { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres + } + pub fn unpack(buf: &mut BytesMut) -> RelTag { + RelTag { + spcnode: buf.get_u32(), + dbnode: buf.get_u32(), + relnode: buf.get_u32(), + forknum: buf.get_u32() as u8, + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub struct BufferTag { - pub spcnode: u32, - pub dbnode: u32, - pub relnode: u32, - pub forknum: u8, + pub rel: RelTag, pub blknum: u32, } +impl BufferTag { + pub fn pack(&self, buf: &mut BytesMut) { + self.rel.pack(buf); + buf.put_u32(self.blknum); + } + pub fn unpack(buf: &mut BytesMut) -> BufferTag { + BufferTag { + rel: RelTag::unpack(buf), + blknum: buf.get_u32(), + } + } +} + #[derive(Clone)] pub struct WALRecord { pub lsn: u64, // LSN at the *end* of the record pub will_init: bool, + pub truncate: bool, pub rec: Bytes, // Remember the offset of main_data in rec, // so that we don't have to parse the record again. // If record has no main_data, this offset equals rec.len(). - pub main_data_offset: usize, + pub main_data_offset: u32, +} + +impl WALRecord { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u64(self.lsn); + buf.put_u8(self.will_init as u8); + buf.put_u8(self.truncate as u8); + buf.put_u32(self.main_data_offset); + buf.put_u32(self.rec.len() as u32); + buf.put_slice(&self.rec[..]); + } + pub fn unpack(buf: &mut BytesMut) -> WALRecord { + let lsn = buf.get_u64(); + let will_init = buf.get_u8() != 0; + let truncate = buf.get_u8() != 0; + let main_data_offset = buf.get_u32(); + let mut dst = vec![0u8; buf.get_u32() as usize]; + buf.copy_to_slice(&mut dst); + WALRecord { + lsn, + will_init, + truncate, + rec: Bytes::from(dst), + main_data_offset, + } + } } // Public interface functions impl PageCache { - // - // GetPage@LSN - // - // Returns an 8k page image - // - pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + let mut minbuf = BytesMut::new(); + let mut maxbuf = BytesMut::new(); + let cf = self + .db + .cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME) + .unwrap(); + loop { + thread::sleep(conf.gc_period); + let last_lsn = self.get_last_valid_lsn(); + if last_lsn > conf.gc_horizon { + let horizon = last_lsn - conf.gc_horizon; + let mut maxkey = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: u32::MAX, + dbnode: u32::MAX, + relnode: u32::MAX, + forknum: u8::MAX, + }, + blknum: u32::MAX, + }, + lsn: u64::MAX, + }; + loop { + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &maxbuf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag, lsn: 0 }; - let maxkey = CacheKey { tag, lsn }; + // Construct boundaries for old records cleanup + maxkey.tag = key.tag; + let last_lsn = key.lsn; + maxkey.lsn = min(horizon, last_lsn); // do not remove last version + let mut minkey = maxkey.clone(); + minkey.lsn = 0; + + // reconstruct most recent page version + if content.wal_record.is_some() { + // force reconstruction of most recent page version + self.reconstruct_page(key, content)?; + } + + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + + if last_lsn > horizon { + // locate most recent record before horizon + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &maxbuf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + if content.wal_record.is_some() { + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); + self.reconstruct_page(key, content)?; + } + } + } + // remove records prior to horizon + minbuf.clear(); + minkey.pack(&mut minbuf); + self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; + + maxkey = minkey; + } + } + } + } + } + + fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result { + let entry_rc = Arc::new(CacheEntry::new(key.clone(), content)); + + let mut entry_content = entry_rc.content.lock().unwrap(); + entry_content.apply_pending = true; + + let s = &self.walredo_sender; + s.send(entry_rc.clone())?; + + while entry_content.apply_pending { + entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); + } + // We should now have a page image. If we don't, it means that WAL redo + // failed to reconstruct it. WAL redo should've logged that error already. + let page_img = match &entry_content.page_image { + Some(p) => p.clone(), + None => { + error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); + bail!("could not apply WAL to reconstruct page image"); + } + }; + self.put_page_image(key.tag, key.lsn, page_img.clone()); + Ok(page_img) + } + + async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire); if walreceiver_works { self.seqwait_lsn @@ -310,81 +517,63 @@ impl PageCache { ); } - let entry_rc: Arc; - { - let shared = self.shared.lock().unwrap(); + let shared = self.shared.lock().unwrap(); - if walreceiver_works { - assert!(lsn <= shared.last_valid_lsn); - } - - if lsn < shared.first_valid_lsn { - bail!( - "LSN {:X}/{:X} has already been removed", - lsn >> 32, - lsn & 0xffff_ffff - ); - } - - let pagecache = &shared.pagecache; - - let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); - - let entry_opt = entries.next_back(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } - let (_key, entry) = entry_opt.unwrap(); - entry_rc = entry.clone(); - - // Now that we have a reference to the cache entry, drop the lock on the map. - // It's important to do this before waiting on the condition variable below, - // and better to do it as soon as possible to maximize concurrency. + if walreceiver_works { + assert!(lsn <= shared.last_valid_lsn); } + Ok(()) + } - // Lock the cache entry and dig the page image out of it. + // + // GetPage@LSN + // + // Returns an 8k page image + // + pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + + self.wait_lsn(lsn).await?; + + // Look up cache entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let minkey = CacheKey { tag, lsn: 0 }; + let maxkey = CacheKey { tag, lsn }; + + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); + let entry_opt = iter.next(); + + if entry_opt.is_none() { + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ + } + let (k, v) = entry_opt.unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); let page_img: Bytes; - { - let mut entry_content = entry_rc.content.lock().unwrap(); - - if let Some(img) = &entry_content.page_image { - assert!(!entry_content.apply_pending); - page_img = img.clone(); - } else if entry_content.wal_record.is_some() { - // - // If this page needs to be reconstructed by applying some WAL, - // send a request to the WAL redo thread. - // - if !entry_content.apply_pending { - assert!(!entry_content.apply_pending); - entry_content.apply_pending = true; - - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - } - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!( - "could not apply WAL to reconstruct page image for GetPage@LSN request" - ); - bail!("could not apply WAL to reconstruct page image"); - } - }; - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); - } + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + page_img = self.reconstruct_page(key, content)?; + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); } // FIXME: assumes little-endian. Only used for the debugging log though @@ -394,10 +583,10 @@ impl PageCache { "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo, - tag.spcnode, - tag.dbnode, - tag.relnode, - tag.forknum, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, tag.blknum ); @@ -412,38 +601,42 @@ impl PageCache { // over it. // pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { - // Scan the BTreeMap backwards, starting from the given entry. - let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - let minkey = CacheKey { - tag: entry.key.tag, + tag: BufferTag { + rel: entry.key.tag.rel, + blknum: 0, + }, lsn: 0, }; - let maxkey = CacheKey { - tag: entry.key.tag, - lsn: entry.key.lsn, - }; - let entries = pagecache.range((Included(&minkey), Included(&maxkey))); - // the last entry in the range should be the CacheEntry we were given - //let _last_entry = entries.next_back(); - //assert!(last_entry == entry); + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + entry.key.pack(&mut buf); + let iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); let mut base_img: Option = None; let mut records: Vec = Vec::new(); // Scan backwards, collecting the WAL records, until we hit an // old page image. - for (_key, e) in entries.rev() { - let e = e.content.lock().unwrap(); - - if let Some(img) = &e.page_image { + for (_k, v) in iter { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(img) = &content.page_image { // We have a base image. No need to dig deeper into the list of // records base_img = Some(img.clone()); break; - } else if let Some(rec) = &e.wal_record { + } else if let Some(rec) = &content.wal_record { records.push(rec.clone()); // If this WAL record initializes the page, no need to dig deeper. @@ -466,57 +659,76 @@ impl PageCache { let lsn = rec.lsn; let key = CacheKey { tag, lsn }; - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().wal_record = Some(rec); - - let mut shared = self.shared.lock().unwrap(); - - let rel_tag = RelTag { - spcnode: tag.spcnode, - dbnode: tag.dbnode, - relnode: tag.relnode, - forknum: tag.forknum, + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, }; - let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0); - if tag.blknum >= *rel_entry { - *rel_entry = tag.blknum + 1; - } + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + let _res = self.db.put(&key_buf[..], &val_buf[..]); //trace!("put_wal_record lsn: {}", lsn); - let oldentry = shared.pagecache.insert(key, Arc::new(entry)); self.num_entries.fetch_add(1, Ordering::Relaxed); - - if oldentry.is_some() { - error!( - "overwriting WAL record with LSN {:X}/{:X} in page cache", - lsn >> 32, - lsn & 0xffffffff - ); - } - self.num_wal_records.fetch_add(1, Ordering::Relaxed); } + // + // Adds a relation-wide WAL record (like truncate) to the page cache, + // associating it with all pages started with specified block number + // + pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + let mut key = CacheKey { tag, lsn: rec.lsn }; + let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?; + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + // set new relation size + trace!("Truncate relation {:?}", tag); + let mut key_buf = BytesMut::new(); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + for blknum in tag.blknum..old_rel_size { + key_buf.clear(); + key.tag.blknum = blknum; + key.pack(&mut key_buf); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + } + let n = (old_rel_size - tag.blknum) as u64; + self.num_entries.fetch_add(n, Ordering::Relaxed); + self.num_wal_records.fetch_add(n, Ordering::Relaxed); + Ok(()) + } + // // Memorize a full image of a page version // pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { let key = CacheKey { tag, lsn }; + let content = CacheEntryContent { + page_image: Some(img), + wal_record: None, + apply_pending: false, + }; - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().page_image = Some(img); + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); - let mut shared = self.shared.lock().unwrap(); - let pagecache = &mut shared.pagecache; - - let oldentry = pagecache.insert(key, Arc::new(entry)); - self.num_entries.fetch_add(1, Ordering::Relaxed); - assert!(oldentry.is_none()); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - self.num_page_images.fetch_add(1, Ordering::Relaxed); } @@ -602,83 +814,78 @@ impl PageCache { shared.last_record_lsn } - // - // Simple test function for the WAL redo code: - // - // 1. Pick a page from the page cache at random. - // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) - // - // - pub async fn _test_get_page_at_lsn(&self) { - // for quick testing of the get_page_at_lsn() funcion. - // - // Get a random page from the page cache. Apply all its WAL, by requesting - // that page at the highest lsn. + pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + if lsn != u64::MAX { + self.wait_lsn(lsn).await?; + } - let mut tag: Option = None; + let mut key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); - { - let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - - if pagecache.is_empty() { - info!("page cache is empty"); - return; - } - - // Find nth entry in the map, where n is picked at random - let n = rand::thread_rng().gen_range(0..pagecache.len()); - let mut i = 0; - for (key, _e) in pagecache.iter() { - if i == n { - tag = Some(key.tag); - break; + loop { + buf.clear(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(rec) = &content.wal_record { + if rec.truncate { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; + } + break; + } + } + let relsize = tag.blknum + 1; + return Ok(relsize); } - i += 1; - } - } - - info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match self - .get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) - .await - { - Ok(_img) => { - // This prints out the whole page image. - //println!("{:X?}", img); - } - Err(error) => { - error!("GetPage@LSN failed: {}", error); } + break; } + Ok(0) } - /// Remember a relation's size in blocks. - /// - /// If 'to' is larger than the previously remembered size, the remembered size is increased to 'to'. - /// But if it's smaller, there is no change. - pub fn relsize_inc(&self, rel: &RelTag, to: u32) { - // FIXME: Shouldn't relation size also be tracked with an LSN? - // If a replica is lagging behind, it needs to get the size as it was on - // the replica's current replay LSN. - let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); + pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn).await?; - if to >= *entry { - *entry = to; + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + return Ok(true); + } } - } - - pub fn relsize_get(&self, rel: &RelTag) -> u32 { - let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); - *entry - } - - pub fn relsize_exist(&self, rel: &RelTag) -> bool { - let shared = self.shared.lock().unwrap(); - let relsize_cache = &shared.relsize_cache; - relsize_cache.contains_key(rel) + Ok(false) } pub fn get_stats(&self) -> PageCacheStats { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index db81333747..37790b5561 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -18,6 +18,7 @@ use std::io; use std::str::FromStr; use std::sync::Arc; use std::thread; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime; @@ -50,12 +51,8 @@ enum FeMessage { // All that messages are actually CopyData from libpq point of view. // ZenithExistsRequest(ZenithRequest), - ZenithTruncRequest(ZenithRequest), - ZenithUnlinkRequest(ZenithRequest), ZenithNblocksRequest(ZenithRequest), ZenithReadRequest(ZenithRequest), - ZenithCreateRequest(ZenithRequest), - ZenithExtendRequest(ZenithRequest), } #[derive(Debug)] @@ -383,12 +380,8 @@ impl FeMessage { // serialization. match smgr_tag { 0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))), - 1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))), - 2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))), - 3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), - 4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), - 5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))), - 6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))), + 1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), + 2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), _ => Err(io::Error::new( io::ErrorKind::InvalidInput, format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf), @@ -431,6 +424,7 @@ pub fn thread_main(conf: &PageServerConf) { loop { let (socket, peer_addr) = listener.accept().await.unwrap(); debug!("accepted connection from {}", peer_addr); + socket.set_nodelay(true).unwrap(); let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref); task::spawn(async move { @@ -625,7 +619,7 @@ impl Connection { let mut unnamed_query_string = Bytes::new(); loop { let msg = self.read_message().await?; - info!("got message {:?}", msg); + trace!("got message {:?}", msg); match msg { Some(FeMessage::StartupMessage(m)) => { trace!("got message {:?}", m); @@ -788,7 +782,7 @@ impl Connection { let message = self.read_message().await?; if let Some(m) = &message { - info!("query({:?}): {:?}", timelineid, m); + trace!("query({:?}): {:?}", timelineid, m); }; if message.is_none() { @@ -805,7 +799,7 @@ impl Connection { forknum: req.forknum, }; - let exist = pcache.relsize_exist(&tag); + let exist = pcache.relsize_exist(&tag, req.lsn).await.unwrap_or(false); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: exist, @@ -813,20 +807,6 @@ impl Connection { })) .await? } - Some(FeMessage::ZenithTruncRequest(_)) => { - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } - Some(FeMessage::ZenithUnlinkRequest(_)) => { - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } Some(FeMessage::ZenithNblocksRequest(req)) => { let tag = page_cache::RelTag { spcnode: req.spcnode, @@ -835,7 +815,7 @@ impl Connection { forknum: req.forknum, }; - let n_blocks = pcache.relsize_get(&tag); + let n_blocks = pcache.relsize_get(&tag, req.lsn).await.unwrap_or(0); self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, @@ -845,10 +825,12 @@ impl Connection { } Some(FeMessage::ZenithReadRequest(req)) => { let buf_tag = page_cache::BufferTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, + rel: page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }, blknum: req.blkno, }; @@ -871,38 +853,6 @@ impl Connection { self.write_message(&msg).await? } - Some(FeMessage::ZenithCreateRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, 0); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } - Some(FeMessage::ZenithExtendRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, req.blkno + 1); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } _ => {} } } @@ -942,10 +892,7 @@ impl Connection { let joinres = f_tar.await; if let Err(joinreserr) = joinres { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - joinreserr, - )); + return Err(io::Error::new(io::ErrorKind::InvalidData, joinreserr)); } joinres.unwrap() }; @@ -982,7 +929,7 @@ impl Connection { // FIXME: I'm getting an error from the tokio copyout driver without this. // I think it happens when the CommandComplete, CloseComplete and ReadyForQuery // are sent in the same TCP packet as the CopyDone. I don't understand why. - thread::sleep(std::time::Duration::from_secs(1)); + thread::sleep(Duration::from_secs(1)); Ok(()) } diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs index 010a24a49a..cd23ef6b1c 100644 --- a/pageserver/src/pg_constants.rs +++ b/pageserver/src/pg_constants.rs @@ -44,10 +44,13 @@ pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4; // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; +pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; pub const RM_XLOG_ID: u8 = 0; pub const RM_XACT_ID: u8 = 1; +pub const RM_SMGR_ID: u8 = 2; pub const RM_CLOG_ID: u8 = 3; // pub const RM_MULTIXACT_ID:u8 = 6; // from xlogreader.h pub const XLR_INFO_MASK: u8 = 0x0F; +pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 1477233a6d..c9cb050b8d 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -29,6 +29,7 @@ use bytes::Bytes; use crate::page_cache; use crate::page_cache::BufferTag; use crate::page_cache::PageCache; +use crate::page_cache::RelTag; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; @@ -202,11 +203,13 @@ fn restore_relfile( let r = file.read_exact(&mut buf); match r { Ok(_) => { - let tag = page_cache::BufferTag { - spcnode: spcoid, - dbnode: dboid, - relnode, - forknum: forknum as u8, + let tag = BufferTag { + rel: RelTag { + spcnode: spcoid, + dbnode: dboid, + relnode: relnode, + forknum: forknum as u8, + }, blknum, }; pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); @@ -233,14 +236,6 @@ fn restore_relfile( blknum += 1; } - let tag = page_cache::RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode, - forknum: forknum as u8, - }; - pcache.relsize_inc(&tag, blknum); - Ok(()) } @@ -307,18 +302,21 @@ fn restore_wal( // so having multiple copies of it doesn't cost that much) for blk in decoded.blocks.iter() { let tag = BufferTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, blknum: blk.blkno, }; let rec = page_cache::WALRecord { lsn, will_init: blk.will_init || blk.apply_image, + truncate: false, rec: recdata.clone(), - main_data_offset: decoded.main_data_offset, + main_data_offset: decoded.main_data_offset as u32, }; pcache.put_wal_record(tag, rec); diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index d3cc86e4e0..6b9d7d1ad8 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -306,10 +306,12 @@ async fn slurp_base_file( while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { - spcnode: parsed.spcnode, - dbnode: parsed.dbnode, - relnode: parsed.relnode, - forknum: parsed.forknum as u8, + rel: page_cache::RelTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + }, blknum, }; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 7bd7320691..15dca57786 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -328,6 +328,8 @@ impl DecodedBkpBlock { const SizeOfXLogRecord: u32 = 24; pub struct DecodedWALRecord { + pub xl_info: u8, + pub xl_rmid: u8, pub record: Bytes, // raw XLogRecord pub blocks: Vec, @@ -353,11 +355,40 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool { xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID } -#[derive(Clone, Copy)] +pub type Oid = u32; +pub type BlockNumber = u32; + +pub const MAIN_FORKNUM: u8 = 0; +pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; + +#[repr(C)] +#[derive(Debug, Clone, Copy)] pub struct RelFileNode { - pub spcnode: u32, - pub dbnode: u32, - pub relnode: u32, + pub spcnode: Oid, /* tablespace */ + pub dbnode: Oid, /* database */ + pub relnode: Oid, /* relation */ +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlSmgrTruncate { + pub blkno: BlockNumber, + pub rnode: RelFileNode, + pub flags: u32, +} + +pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate { + let mut buf = decoded.record.clone(); + buf.advance((SizeOfXLogRecord + 2) as usize); + XlSmgrTruncate { + blkno: buf.get_u32_le(), + rnode: RelFileNode { + spcnode: buf.get_u32_le(), /* tablespace */ + dbnode: buf.get_u32_le(), /* database */ + relnode: buf.get_u32_le(), /* relation */ + }, + flags: buf.get_u32_le(), + } } // @@ -379,13 +410,13 @@ pub struct RelFileNode { // block data // ... // main data -pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; let mut got_rnode = false; - let mut buf = rec.clone(); + let mut buf = record.clone(); // 1. Parse XLogRecord struct @@ -649,7 +680,9 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord { } DecodedWALRecord { - record: rec, + xl_info, + xl_rmid, + record, blocks, main_data_offset, } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 0145ae77c4..ba517b2138 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,8 +7,9 @@ //! use crate::page_cache; -use crate::page_cache::BufferTag; -use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; +use crate::page_cache::{BufferTag, RelTag}; +use crate::pg_constants; +use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; use anyhow::Error; @@ -223,22 +224,51 @@ async fn walreceiver_main( // so having multiple copies of it doesn't cost that much) for blk in decoded.blocks.iter() { let tag = BufferTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, blknum: blk.blkno, }; let rec = page_cache::WALRecord { lsn, will_init: blk.will_init || blk.apply_image, + truncate: false, rec: recdata.clone(), - main_data_offset: decoded.main_data_offset, + main_data_offset: decoded.main_data_offset as u32, }; pcache.put_wal_record(tag, rec); } + // include truncate wal record in all pages + if decoded.xl_rmid == pg_constants::RM_SMGR_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_SMGR_TRUNCATE + { + let truncate = decode_truncate_record(&decoded); + if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { + let tag = BufferTag { + rel: RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: MAIN_FORKNUM, + }, + blknum: truncate.blkno, + }; + let rec = page_cache::WALRecord { + lsn: lsn, + will_init: false, + truncate: true, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + pcache.put_rel_wal_record(tag, rec).await?; + } + } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN pcache.advance_last_record_lsn(lsn); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index bf0159af4d..0fc9ae51e7 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -18,7 +18,10 @@ use log::*; use std::assert; use std::cell::RefCell; use std::fs; +use std::fs::OpenOptions; +use std::io::prelude::*; use std::io::Error; +use std::path::PathBuf; use std::process::Stdio; use std::sync::Arc; use std::time::Duration; @@ -66,21 +69,28 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) { let _guard = runtime.enter(); process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); } + info!("WAL redo postgres started"); // Pretty arbitrarily, reuse the same Postgres process for 100 requests. // After that, kill it and start a new one. This is mostly to avoid // using up all shared buffers in Postgres's shared buffer cache; we don't // want to write any pages to disk in the WAL redo process. - for _i in 1..100 { + for _i in 1..100000 { let request = walredo_channel_receiver.recv().unwrap(); let result = handle_apply_request(&pcache, &process, &runtime, request); if result.is_err() { - // On error, kill the process. + // Something went wrong with handling the request. It's not clear + // if the request was faulty, and the next request would succeed + // again, or if the 'postgres' process went haywire. To be safe, + // kill the 'postgres' process so that we will start from a clean + // slate, with a new process, for the next request. break; } } + // Time to kill the 'postgres' process. A new one will be launched on next + // iteration of the loop. info!("killing WAL redo postgres process"); let _ = runtime.block_on(process.stdin.get_mut().shutdown()); let mut child = process.child; @@ -153,6 +163,7 @@ fn handle_apply_request( let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); let mut entry = entry_rc.content.lock().unwrap(); + assert!(entry.apply_pending); entry.apply_pending = false; let nrecords = records.len(); @@ -160,7 +171,7 @@ fn handle_apply_request( let start = Instant::now(); let apply_result: Result; - if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { //TODO use base image if any static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let zero_page_bytes: &[u8] = &ZERO_PAGE; @@ -215,9 +226,6 @@ fn handle_apply_request( result = Err(e); } else { entry.page_image = Some(apply_result.unwrap()); - pcache - .num_page_images - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); result = Ok(()); } @@ -258,8 +266,14 @@ impl WalRedoProcess { std::str::from_utf8(&initdb.stdout).unwrap(), std::str::from_utf8(&initdb.stderr).unwrap() ); + } else { + // Limit shared cache for wal-redo-postres + let mut config = OpenOptions::new() + .append(true) + .open(PathBuf::from(&datadir).join("postgresql.conf"))?; + config.write(b"shared_buffers=128kB\n")?; + config.write(b"fsync=off\n")?; } - // Start postgres itself let mut child = Command::new("postgres") .arg("--wal-redo") @@ -290,7 +304,7 @@ impl WalRedoProcess { if res.unwrap() == 0 { break; } - debug!("wal-redo-postgres: {}", line.trim()); + error!("wal-redo-postgres: {}", line.trim()); line.clear(); } Ok::<(), Error>(()) @@ -390,11 +404,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { buf.put_u8(b'B'); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); assert!(buf.len() == 1 + len); @@ -409,11 +419,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.put_u8(b'P'); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); buf.put(base_img); assert!(buf.len() == 1 + len); @@ -441,11 +447,7 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes { buf.put_u8(b'G'); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); assert!(buf.len() == 1 + len); diff --git a/postgres_ffi/Cargo.toml b/postgres_ffi/Cargo.toml index e843a28385..82d498b32c 100644 --- a/postgres_ffi/Cargo.toml +++ b/postgres_ffi/Cargo.toml @@ -17,4 +17,4 @@ hex = "0.4.3" log = "0.4.14" [build-dependencies] -bindgen = "0.53.1" +bindgen = "0.57" diff --git a/vendor/postgres b/vendor/postgres index 610033bb35..daec929ec3 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 610033bb353e2074e69fb742a43f4828403fe1e1 +Subproject commit daec929ec3f357f1af19b33fa6862acaa2fcf34d diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index 91805dc19e..bd94b8b350 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -196,5 +196,4 @@ mod tests { // because the waiter already dropped its Receiver. seq.advance(99); } - }