Merge pull request #54 from zenithdb/rocksdb_pageserver

Rocksdb pageserver
This commit is contained in:
Konstantin Knizhnik
2021-04-22 14:08:00 +03:00
committed by GitHub
20 changed files with 875 additions and 493 deletions

View File

@@ -4,7 +4,7 @@ on: [push]
jobs:
regression-check:
timeout-minutes: 10
timeout-minutes: 30
name: run regression test suite
runs-on: ubuntu-latest

185
Cargo.lock generated
View File

@@ -91,19 +91,19 @@ dependencies = [
[[package]]
name = "async-io"
version = "1.3.1"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd"
checksum = "fcb9af4888a70ad78ecb5efcb0ba95d66a3cf54a88b62ae81559954c7588c7a2"
dependencies = [
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"nb-connect",
"once_cell",
"parking",
"polling",
"socket2",
"vec-arena",
"waker-fn",
"winapi",
@@ -111,9 +111,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "2.3.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb"
checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
dependencies = [
"event-listener",
]
@@ -162,9 +162,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-trait"
version = "0.1.49"
version = "0.1.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589652ce7ccb335d1e7ecb3be145425702b290dbcb7029bbeaae263fc1d87b48"
checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
dependencies = [
"proc-macro2",
"quote",
@@ -243,13 +243,12 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bindgen"
version = "0.53.3"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5"
checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d"
dependencies = [
"bitflags",
"cexpr",
"cfg-if 0.1.10",
"clang-sys",
"clap",
"env_logger",
@@ -346,6 +345,9 @@ name = "cc"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -383,9 +385,9 @@ dependencies = [
[[package]]
name = "clang-sys"
version = "0.29.3"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a"
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
dependencies = [
"glob",
"libc",
@@ -596,9 +598,9 @@ dependencies = [
[[package]]
name = "env_logger"
version = "0.7.1"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
dependencies = [
"atty",
"humantime",
@@ -922,9 +924,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.3.6"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc35c995b9d93ec174cf9a27d425c7892722101e14993cd227fdb51d70cf9589"
checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437"
[[package]]
name = "httpdate"
@@ -934,12 +936,9 @@ checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
name = "humantime"
version = "1.3.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
@@ -980,9 +979,9 @@ dependencies = [
[[package]]
name = "idna"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
@@ -1033,6 +1032,15 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "jobserver"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.50"
@@ -1071,14 +1079,26 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
[[package]]
name = "libloading"
version = "0.5.2"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
dependencies = [
"cc",
"cfg-if 1.0.0",
"winapi",
]
[[package]]
name = "librocksdb-sys"
version = "6.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9"
dependencies = [
"bindgen",
"cc",
"glob",
"libc",
]
[[package]]
name = "lock_api"
version = "0.4.3"
@@ -1184,16 +1204,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nb-connect"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
dependencies = [
"libc",
"socket2",
]
[[package]]
name = "nom"
version = "5.1.2"
@@ -1213,6 +1223,41 @@ dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@@ -1223,6 +1268,29 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
@@ -1319,12 +1387,14 @@ dependencies = [
"hex",
"lazy_static",
"log",
"parse_duration",
"postgres",
"postgres-protocol",
"postgres-types",
"postgres_ffi",
"rand 0.8.3",
"regex",
"rocksdb",
"rust-s3",
"slog",
"slog-async",
@@ -1373,6 +1443,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
dependencies = [
"lazy_static",
"num",
"regex",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@@ -1405,18 +1486,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc174859768806e91ae575187ada95c91a29e96a98dc5d2cd9a1fed039501ba6"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a490329918e856ed1b083f244e3bfe2d8c4f336407e4ea9e1a9f479ff09049e5"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
dependencies = [
"proc-macro2",
"quote",
@@ -1536,12 +1617,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.9"
@@ -1738,6 +1813,16 @@ dependencies = [
"winreg",
]
[[package]]
name = "rocksdb"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
name = "rust-argon2"
version = "0.8.3"
@@ -1975,9 +2060,9 @@ checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27"
[[package]]
name = "slab"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
[[package]]
name = "slog"
@@ -2415,9 +2500,9 @@ dependencies = [
[[package]]
name = "vcpkg"
version = "0.2.11"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
[[package]]
name = "vec-arena"

View File

@@ -3,6 +3,7 @@ use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::TcpStream;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
@@ -16,7 +17,7 @@ use postgres::{Client, NoTls};
use crate::local_env::LocalEnv;
use crate::storage::{PageServerNode, WalProposerNode};
use pageserver::ZTimelineId;
use pageserver::{zenith_repo_dir, ZTimelineId};
//
// ComputeControlPlane
@@ -276,7 +277,9 @@ impl PostgresNode {
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_sender_timeout = 0\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
@@ -443,8 +446,71 @@ impl PostgresNode {
}
}
// TODO
pub fn pg_bench() {}
pub fn pg_regress(&self) {
self.safe_psql("postgres", "CREATE DATABASE regression");
let data_dir = zenith_repo_dir();
let regress_run_path = data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
}
pub fn pg_bench(&self, clients: u32, seconds: u32) {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
}
}
impl Drop for PostgresNode {

View File

@@ -15,6 +15,7 @@ use std::process::{Command, Stdio};
use anyhow::Result;
use serde_derive::{Deserialize, Serialize};
use pageserver::zenith_repo_dir;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils;
@@ -52,14 +53,6 @@ impl LocalEnv {
}
}
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
//
// Initialize a new Zenith repository
//
@@ -145,7 +138,10 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
local_env.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;

View File

@@ -13,7 +13,6 @@ use std::time::Duration;
use postgres::{Client, NoTls};
use crate::compute::PostgresNode;
use crate::local_env::LocalEnv;
use pageserver::ZTimelineId;
@@ -111,6 +110,9 @@ impl TestStorageControlPlane {
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
@@ -366,43 +368,6 @@ impl Drop for WalProposerNode {
}
}
///////////////////////////////////////////////////////////////////////////////
pub fn regress_check(pg: &PostgresNode) {
pg.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("PGHOST", pg.address.ip().to_string())
.env("PGPORT", pg.address.port().to_string())
.env("PGUSER", pg.whoami())
.status()
.expect("pg_regress failed");
}
/// Read a PID file
///
/// This should contain an unsigned integer, but we return it as a String

View File

@@ -51,7 +51,6 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
#[ignore]
fn test_regress() {
let local_env = local_env::test_env("test_regress");
@@ -64,7 +63,24 @@ fn test_regress() {
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap();
control_plane::storage::regress_check(&node);
node.pg_regress();
}
// Runs pg_bench on a compute node
#[test]
fn pgbench() {
let local_env = local_env::test_env("pgbench");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap();
node.pg_bench(10, 100);
}
// Run two postgres instances on one pageserver, on different timelines

View File

@@ -32,12 +32,14 @@ tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
rocksdb = "0.16.0"
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"
thiserror = "1.0"
hex = "0.4.3"
tar = "0.4.33"
parse_duration = "*"
postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -3,12 +3,13 @@
//
use log::*;
use std::fs;
use std::fs::{File, OpenOptions};
use parse_duration::parse;
use std::fs::{self, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -16,18 +17,10 @@ use daemonize::Daemonize;
use slog::Drain;
use pageserver::page_service;
use pageserver::tui;
//use pageserver::walreceiver;
use pageserver::PageServerConf;
use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf};
fn zenith_repo_dir() -> String {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
const DEFAULT_GC_HORIZON: u64 = 0; //64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 1;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
@@ -53,11 +46,25 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Run in the background"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
.takes_value(true)
.help("Distance from current LSN to perform all wal records cleanup"),
)
.arg(
Arg::with_name("gc_period")
.long("gc_period")
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.get_matches();
let mut conf = PageServerConf {
daemonize: false,
interactive: false,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
};
@@ -78,6 +85,14 @@ fn main() -> Result<()> {
conf.listen_addr = addr.parse()?;
}
if let Some(horizon) = arg_matches.value_of("gc_horizon") {
conf.gc_horizon = horizon.parse()?;
}
if let Some(period) = arg_matches.value_of("gc_period") {
conf.gc_period = parse(period)?;
}
start_pageserver(&conf)
}
@@ -139,7 +154,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
// does this for us.
let repodir = zenith_repo_dir();
std::env::set_current_dir(&repodir)?;
info!("Changed current directory to repository in {}", &repodir);
info!("Changed current directory to repository in {:?}", &repodir);
}
let mut threads = Vec::new();
@@ -185,12 +200,16 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
if conf.interactive {
Ok(tui::init_logging())
} else if conf.daemonize {
let log = zenith_repo_dir() + "/pageserver.log";
let log_file = File::create(&log).map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let log = zenith_repo_dir().join("pageserver.log");
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log)
.map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog::Filter::new(drain, |record: &slog::Record| {

View File

@@ -1,6 +1,8 @@
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
pub mod basebackup;
pub mod page_cache;
@@ -19,6 +21,8 @@ pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub listen_addr: SocketAddr,
pub gc_horizon: u64,
pub gc_period: Duration,
}
/// Zenith Timeline ID is a 128-bit random ID.
@@ -81,3 +85,11 @@ impl fmt::Display for ZTimelineId {
f.write_str(&hex::encode(self.0))
}
}
pub fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}

View File

@@ -8,16 +8,16 @@
use crate::restore_local_repo::restore_timeline;
use crate::ZTimelineId;
use crate::{walredo, PageServerConf};
use crate::{walredo, zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
use bytes::Bytes;
use core::ops::Bound::Included;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use std::collections::{BTreeMap, HashMap};
use rocksdb;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::{Arc, Condvar, Mutex};
@@ -32,6 +32,9 @@ static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache {
shared: Mutex<PageCacheShared>,
// RocksDB handle
db: rocksdb::DB,
// Channel for communicating with the WAL redo process here.
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
@@ -82,16 +85,6 @@ impl AddAssign for PageCacheStats {
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, Arc<CacheEntry>>,
// Relation n_blocks cache
//
// This hashtable should be updated together with the pagecache. Now it is
// accessed unreasonably often through the smgr_nblocks(). It is better to just
// cache it in postgres smgr and ask only on restart.
relsize_cache: HashMap<RelTag, u32>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
@@ -135,7 +128,7 @@ pub fn get_or_restore_pagecache(
match pcaches.get(&timelineid) {
Some(pcache) => Ok(pcache.clone()),
None => {
let pcache = init_page_cache();
let pcache = init_page_cache(conf, timelineid);
restore_timeline(conf, &pcache, timelineid)?;
@@ -155,20 +148,42 @@ pub fn get_or_restore_pagecache(
walredo::wal_redo_main(&conf_copy, timelineid);
})
.unwrap();
if conf.gc_horizon != 0 {
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
gc_thread_main(&conf_copy, timelineid);
})
.unwrap();
}
Ok(result)
}
}
}
fn init_page_cache() -> PageCache {
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();
}
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
let path = zenith_repo_dir().join(timelineid.to_string());
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
rocksdb::DB::open(&opts, &path).unwrap()
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();
PageCache {
db: open_rocksdb(&conf, timelineid),
shared: Mutex::new(PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
@@ -209,6 +224,19 @@ pub struct CacheKey {
pub lsn: u64,
}
impl CacheKey {
pub fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn);
}
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: buf.get_u64(),
}
}
}
pub struct CacheEntry {
pub key: CacheKey,
@@ -228,21 +256,47 @@ pub struct CacheEntryContent {
pub apply_pending: bool,
}
impl CacheEntry {
fn new(key: CacheKey) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(CacheEntryContent {
page_image: None,
impl CacheEntryContent {
pub fn pack(&self, buf: &mut BytesMut) {
if let Some(image) = &self.page_image {
buf.put_u8(1);
buf.put_u16(image.len() as u16);
buf.put_slice(&image[..]);
} else if let Some(rec) = &self.wal_record {
buf.put_u8(0);
rec.pack(buf);
}
}
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
if buf.get_u8() == 1 {
let mut dst = vec![0u8; buf.get_u16() as usize];
buf.copy_to_slice(&mut dst);
CacheEntryContent {
page_image: Some(Bytes::from(dst)),
wal_record: None,
apply_pending: false,
}),
}
} else {
CacheEntryContent {
page_image: None,
wal_record: Some(WALRecord::unpack(buf)),
apply_pending: false,
}
}
}
}
impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(content),
walredo_condvar: Condvar::new(),
}
}
}
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
pub dbnode: u32,
@@ -250,42 +304,195 @@ pub struct RelTag {
pub forknum: u8,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
impl RelTag {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
}
pub fn unpack(buf: &mut BytesMut) -> RelTag {
RelTag {
spcnode: buf.get_u32(),
dbnode: buf.get_u32(),
relnode: buf.get_u32(),
forknum: buf.get_u32() as u8,
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct BufferTag {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u8,
pub rel: RelTag,
pub blknum: u32,
}
impl BufferTag {
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);
}
pub fn unpack(buf: &mut BytesMut) -> BufferTag {
BufferTag {
rel: RelTag::unpack(buf),
blknum: buf.get_u32(),
}
}
}
#[derive(Clone)]
pub struct WALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub will_init: bool,
pub truncate: bool,
pub rec: Bytes,
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: usize,
pub main_data_offset: u32,
}
impl WALRecord {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u64(self.lsn);
buf.put_u8(self.will_init as u8);
buf.put_u8(self.truncate as u8);
buf.put_u32(self.main_data_offset);
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
let lsn = buf.get_u64();
let will_init = buf.get_u8() != 0;
let truncate = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
let mut dst = vec![0u8; buf.get_u32() as usize];
buf.copy_to_slice(&mut dst);
WALRecord {
lsn,
will_init,
truncate,
rec: Bytes::from(dst),
main_data_offset,
}
}
}
// Public interface functions
impl PageCache {
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self
.db
.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
.unwrap();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
if last_lsn > conf.gc_horizon {
let horizon = last_lsn - conf.gc_horizon;
let mut maxkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: u64::MAX,
};
loop {
maxbuf.clear();
maxkey.pack(&mut maxbuf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = 0;
// reconstruct most recent page version
if content.wal_record.is_some() {
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
}
maxbuf.clear();
maxkey.pack(&mut maxbuf);
if last_lsn > horizon {
// locate most recent record before horizon
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
self.reconstruct_page(key, content)?;
}
}
}
// remove records prior to horizon
minbuf.clear();
minkey.pack(&mut minbuf);
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
maxkey = minkey;
}
}
}
}
}
fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));
let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
Ok(page_img)
}
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
if walreceiver_works {
self.seqwait_lsn
@@ -310,81 +517,63 @@ impl PageCache {
);
}
let entry_rc: Arc<CacheEntry>;
{
let shared = self.shared.lock().unwrap();
let shared = self.shared.lock().unwrap();
if walreceiver_works {
assert!(lsn <= shared.last_valid_lsn);
}
if lsn < shared.first_valid_lsn {
bail!(
"LSN {:X}/{:X} has already been removed",
lsn >> 32,
lsn & 0xffff_ffff
);
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
if walreceiver_works {
assert!(lsn <= shared.last_valid_lsn);
}
Ok(())
}
// Lock the cache entry and dig the page image out of it.
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
self.wait_lsn(lsn).await?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let entry_opt = iter.next();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
{
let mut entry_content = entry_rc.content.lock().unwrap();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
if !entry_content.apply_pending {
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
}
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
bail!("could not apply WAL to reconstruct page image");
}
};
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
@@ -394,10 +583,10 @@ impl PageCache {
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.spcnode,
tag.dbnode,
tag.relnode,
tag.forknum,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
@@ -412,38 +601,42 @@ impl PageCache {
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
// Scan the BTreeMap backwards, starting from the given entry.
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
let minkey = CacheKey {
tag: entry.key.tag,
tag: BufferTag {
rel: entry.key.tag.rel,
blknum: 0,
},
lsn: 0,
};
let maxkey = CacheKey {
tag: entry.key.tag,
lsn: entry.key.lsn,
};
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
// the last entry in the range should be the CacheEntry we were given
//let _last_entry = entries.next_back();
//assert!(last_entry == entry);
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
entry.key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_key, e) in entries.rev() {
let e = e.content.lock().unwrap();
if let Some(img) = &e.page_image {
for (_k, v) in iter {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(img) = &content.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &e.wal_record {
} else if let Some(rec) = &content.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
@@ -466,57 +659,76 @@ impl PageCache {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
dbnode: tag.dbnode,
relnode: tag.relnode,
forknum: tag.forknum,
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
if tag.blknum >= *rel_entry {
*rel_entry = tag.blknum + 1;
}
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//trace!("put_wal_record lsn: {}", lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
if oldentry.is_some() {
error!(
"overwriting WAL record with LSN {:X}/{:X} in page cache",
lsn >> 32,
lsn & 0xffffffff
);
}
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Adds a relation-wide WAL record (like truncate) to the page cache,
// associating it with all pages started with specified block number
//
pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
let mut key = CacheKey { tag, lsn: rec.lsn };
let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?;
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
// set new relation size
trace!("Truncate relation {:?}", tag);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
}
//
// Memorize a full image of a page version
//
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
wal_record: None,
apply_pending: false,
};
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
@@ -602,83 +814,78 @@ impl PageCache {
shared.last_record_lsn
}
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub async fn _test_get_page_at_lsn(&self) {
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
if lsn != u64::MAX {
self.wait_lsn(lsn).await?;
}
let mut tag: Option<BufferTag> = None;
let mut key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
{
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
info!("page cache is empty");
return;
}
// Find nth entry in the map, where n is picked at random
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
loop {
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(rec) = &content.wal_record {
if rec.truncate {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
continue;
}
break;
}
}
let relsize = tag.blknum + 1;
return Ok(relsize);
}
i += 1;
}
}
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match self
.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee)
.await
{
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
}
Err(error) => {
error!("GetPage@LSN failed: {}", error);
}
break;
}
Ok(0)
}
/// Remember a relation's size in blocks.
///
/// If 'to' is larger than the previously remembered size, the remembered size is increased to 'to'.
/// But if it's smaller, there is no change.
pub fn relsize_inc(&self, rel: &RelTag, to: u32) {
// FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN.
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<bool> {
self.wait_lsn(lsn).await?;
if to >= *entry {
*entry = to;
let key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, _v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
return Ok(true);
}
}
}
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(&self, rel: &RelTag) -> bool {
let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
Ok(false)
}
pub fn get_stats(&self) -> PageCacheStats {

View File

@@ -18,6 +18,7 @@ use std::io;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
@@ -50,12 +51,8 @@ enum FeMessage {
// All that messages are actually CopyData from libpq point of view.
//
ZenithExistsRequest(ZenithRequest),
ZenithTruncRequest(ZenithRequest),
ZenithUnlinkRequest(ZenithRequest),
ZenithNblocksRequest(ZenithRequest),
ZenithReadRequest(ZenithRequest),
ZenithCreateRequest(ZenithRequest),
ZenithExtendRequest(ZenithRequest),
}
#[derive(Debug)]
@@ -383,12 +380,8 @@ impl FeMessage {
// serialization.
match smgr_tag {
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))),
3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))),
6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
@@ -431,6 +424,7 @@ pub fn thread_main(conf: &PageServerConf) {
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref);
task::spawn(async move {
@@ -625,7 +619,7 @@ impl Connection {
let mut unnamed_query_string = Bytes::new();
loop {
let msg = self.read_message().await?;
info!("got message {:?}", msg);
trace!("got message {:?}", msg);
match msg {
Some(FeMessage::StartupMessage(m)) => {
trace!("got message {:?}", m);
@@ -788,7 +782,7 @@ impl Connection {
let message = self.read_message().await?;
if let Some(m) = &message {
info!("query({:?}): {:?}", timelineid, m);
trace!("query({:?}): {:?}", timelineid, m);
};
if message.is_none() {
@@ -805,7 +799,7 @@ impl Connection {
forknum: req.forknum,
};
let exist = pcache.relsize_exist(&tag);
let exist = pcache.relsize_exist(&tag, req.lsn).await.unwrap_or(false);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: exist,
@@ -813,20 +807,6 @@ impl Connection {
}))
.await?
}
Some(FeMessage::ZenithTruncRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithUnlinkRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithNblocksRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
@@ -835,7 +815,7 @@ impl Connection {
forknum: req.forknum,
};
let n_blocks = pcache.relsize_get(&tag);
let n_blocks = pcache.relsize_get(&tag, req.lsn).await.unwrap_or(0);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
@@ -845,10 +825,12 @@ impl Connection {
}
Some(FeMessage::ZenithReadRequest(req)) => {
let buf_tag = page_cache::BufferTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
rel: page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
},
blknum: req.blkno,
};
@@ -871,38 +853,6 @@ impl Connection {
self.write_message(&msg).await?
}
Some(FeMessage::ZenithCreateRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, 0);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithExtendRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, req.blkno + 1);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
_ => {}
}
}
@@ -942,10 +892,7 @@ impl Connection {
let joinres = f_tar.await;
if let Err(joinreserr) = joinres {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
joinreserr,
));
return Err(io::Error::new(io::ErrorKind::InvalidData, joinreserr));
}
joinres.unwrap()
};
@@ -982,7 +929,7 @@ impl Connection {
// FIXME: I'm getting an error from the tokio copyout driver without this.
// I think it happens when the CommandComplete, CloseComplete and ReadyForQuery
// are sent in the same TCP packet as the CopyDone. I don't understand why.
thread::sleep(std::time::Duration::from_secs(1));
thread::sleep(Duration::from_secs(1));
Ok(())
}

View File

@@ -44,10 +44,13 @@ pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4;
// From pg_control.h and rmgrlist.h
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
pub const RM_CLOG_ID: u8 = 3;
// pub const RM_MULTIXACT_ID:u8 = 6;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;

View File

@@ -29,6 +29,7 @@ use bytes::Bytes;
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::page_cache::RelTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -202,11 +203,13 @@ fn restore_relfile(
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = page_cache::BufferTag {
spcnode: spcoid,
dbnode: dboid,
relnode,
forknum: forknum as u8,
let tag = BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
},
blknum,
};
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
@@ -233,14 +236,6 @@ fn restore_relfile(
blknum += 1;
}
let tag = page_cache::RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode,
forknum: forknum as u8,
};
pcache.relsize_inc(&tag, blknum);
Ok(())
}
@@ -307,18 +302,21 @@ fn restore_wal(
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset,
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_wal_record(tag, rec);

View File

@@ -306,10 +306,12 @@ async fn slurp_base_file(
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
rel: page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
},
blknum,
};

View File

@@ -328,6 +328,8 @@ impl DecodedBkpBlock {
const SizeOfXLogRecord: u32 = 24;
pub struct DecodedWALRecord {
pub xl_info: u8,
pub xl_rmid: u8,
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>,
@@ -353,11 +355,40 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID
}
#[derive(Clone, Copy)]
pub type Oid = u32;
pub type BlockNumber = u32;
pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct RelFileNode {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub spcnode: Oid, /* tablespace */
pub dbnode: Oid, /* database */
pub relnode: Oid, /* relation */
}
#[repr(C)]
#[derive(Debug)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
pub flags: u32,
}
pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance((SizeOfXLogRecord + 2) as usize);
XlSmgrTruncate {
blkno: buf.get_u32_le(),
rnode: RelFileNode {
spcnode: buf.get_u32_le(), /* tablespace */
dbnode: buf.get_u32_le(), /* database */
relnode: buf.get_u32_le(), /* relation */
},
flags: buf.get_u32_le(),
}
}
//
@@ -379,13 +410,13 @@ pub struct RelFileNode {
// block data
// ...
// main data
pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
let mut buf = rec.clone();
let mut buf = record.clone();
// 1. Parse XLogRecord struct
@@ -649,7 +680,9 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
}
DecodedWALRecord {
record: rec,
xl_info,
xl_rmid,
record,
blocks,
main_data_offset,
}

View File

@@ -7,8 +7,9 @@
//!
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::page_cache::{BufferTag, RelTag};
use crate::pg_constants;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::Error;
@@ -223,22 +224,51 @@ async fn walreceiver_main(
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset,
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = decode_truncate_record(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
let tag = BufferTag {
rel: RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: MAIN_FORKNUM,
},
blknum: truncate.blkno,
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: false,
truncate: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_rel_wal_record(tag, rec).await?;
}
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_record_lsn(lsn);

View File

@@ -18,7 +18,10 @@ use log::*;
use std::assert;
use std::cell::RefCell;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
@@ -66,21 +69,28 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
let _guard = runtime.enter();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
}
info!("WAL redo postgres started");
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100 {
for _i in 1..100000 {
let request = walredo_channel_receiver.recv().unwrap();
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// On error, kill the process.
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
break;
}
}
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
let mut child = process.child;
@@ -153,6 +163,7 @@ fn handle_apply_request(
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
assert!(entry.apply_pending);
entry.apply_pending = false;
let nrecords = records.len();
@@ -160,7 +171,7 @@ fn handle_apply_request(
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
@@ -215,9 +226,6 @@ fn handle_apply_request(
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
pcache
.num_page_images
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
result = Ok(());
}
@@ -258,8 +266,14 @@ impl WalRedoProcess {
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap()
);
} else {
// Limit shared cache for wal-redo-postres
let mut config = OpenOptions::new()
.append(true)
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
config.write(b"shared_buffers=128kB\n")?;
config.write(b"fsync=off\n")?;
}
// Start postgres itself
let mut child = Command::new("postgres")
.arg("--wal-redo")
@@ -290,7 +304,7 @@ impl WalRedoProcess {
if res.unwrap() == 0 {
break;
}
debug!("wal-redo-postgres: {}", line.trim());
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
@@ -390,11 +404,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
buf.put_u8(b'B');
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
tag.pack(&mut buf);
assert!(buf.len() == 1 + len);
@@ -409,11 +419,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
buf.put_u8(b'P');
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
tag.pack(&mut buf);
buf.put(base_img);
assert!(buf.len() == 1 + len);
@@ -441,11 +447,7 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes {
buf.put_u8(b'G');
buf.put_u32(len as u32);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
tag.pack(&mut buf);
assert!(buf.len() == 1 + len);

View File

@@ -17,4 +17,4 @@ hex = "0.4.3"
log = "0.4.14"
[build-dependencies]
bindgen = "0.53.1"
bindgen = "0.57"

View File

@@ -196,5 +196,4 @@ mod tests {
// because the waiter already dropped its Receiver.
seq.advance(99);
}
}