Compare commits

..

1 Commits

Author SHA1 Message Date
anastasia
babd2339cc [issue #56] Fix race at postgres instance + walreceiver start. Uses postgres/vendor issue_56 branch.
TODO: rebase on main
2021-04-22 15:51:44 +03:00
35 changed files with 791 additions and 1361 deletions

View File

@@ -4,7 +4,7 @@ on: [push]
jobs:
regression-check:
timeout-minutes: 30
timeout-minutes: 10
name: run regression test suite
runs-on: ubuntu-latest

192
Cargo.lock generated
View File

@@ -91,19 +91,19 @@ dependencies = [
[[package]]
name = "async-io"
version = "1.4.0"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb9af4888a70ad78ecb5efcb0ba95d66a3cf54a88b62ae81559954c7588c7a2"
checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd"
dependencies = [
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"nb-connect",
"once_cell",
"parking",
"polling",
"socket2",
"vec-arena",
"waker-fn",
"winapi",
@@ -111,9 +111,9 @@ dependencies = [
[[package]]
name = "async-lock"
version = "2.4.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb"
dependencies = [
"event-listener",
]
@@ -162,9 +162,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
[[package]]
name = "async-trait"
version = "0.1.50"
version = "0.1.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
checksum = "589652ce7ccb335d1e7ecb3be145425702b290dbcb7029bbeaae263fc1d87b48"
dependencies = [
"proc-macro2",
"quote",
@@ -243,12 +243,13 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bindgen"
version = "0.57.0"
version = "0.53.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d"
checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5"
dependencies = [
"bitflags",
"cexpr",
"cfg-if 0.1.10",
"clang-sys",
"clap",
"env_logger",
@@ -345,9 +346,6 @@ name = "cc"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
dependencies = [
"jobserver",
]
[[package]]
name = "cexpr"
@@ -385,9 +383,9 @@ dependencies = [
[[package]]
name = "clang-sys"
version = "1.2.0"
version = "0.29.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a"
dependencies = [
"glob",
"libc",
@@ -598,9 +596,9 @@ dependencies = [
[[package]]
name = "env_logger"
version = "0.8.3"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime",
@@ -924,9 +922,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.4.0"
version = "1.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437"
checksum = "bc35c995b9d93ec174cf9a27d425c7892722101e14993cd227fdb51d70cf9589"
[[package]]
name = "httpdate"
@@ -936,9 +934,12 @@ checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
name = "humantime"
version = "2.1.0"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]]
name = "hyper"
@@ -979,9 +980,9 @@ dependencies = [
[[package]]
name = "idna"
version = "0.2.3"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21"
dependencies = [
"matches",
"unicode-bidi",
@@ -1032,15 +1033,6 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "jobserver"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.50"
@@ -1079,24 +1071,12 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
[[package]]
name = "libloading"
version = "0.7.0"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
dependencies = [
"cfg-if 1.0.0",
"winapi",
]
[[package]]
name = "librocksdb-sys"
version = "6.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9"
dependencies = [
"bindgen",
"cc",
"glob",
"libc",
"winapi",
]
[[package]]
@@ -1204,6 +1184,16 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nb-connect"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
dependencies = [
"libc",
"socket2",
]
[[package]]
name = "nom"
version = "5.1.2"
@@ -1223,41 +1213,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@@ -1268,29 +1223,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
@@ -1387,14 +1319,12 @@ dependencies = [
"hex",
"lazy_static",
"log",
"parse_duration",
"postgres",
"postgres-protocol",
"postgres-types",
"postgres_ffi",
"rand 0.8.3",
"regex",
"rocksdb",
"rust-s3",
"slog",
"slog-async",
@@ -1409,7 +1339,6 @@ dependencies = [
"tokio-stream",
"tui",
"walkdir",
"zenith_utils",
]
[[package]]
@@ -1443,17 +1372,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
dependencies = [
"lazy_static",
"num",
"regex",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@@ -1486,18 +1404,18 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.0.7"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
checksum = "bc174859768806e91ae575187ada95c91a29e96a98dc5d2cd9a1fed039501ba6"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.7"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
checksum = "a490329918e856ed1b083f244e3bfe2d8c4f336407e4ea9e1a9f479ff09049e5"
dependencies = [
"proc-macro2",
"quote",
@@ -1586,7 +1504,6 @@ dependencies = [
"chrono",
"crc32c",
"hex",
"log",
"rand 0.8.3",
]
@@ -1617,6 +1534,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.9"
@@ -1813,16 +1736,6 @@ dependencies = [
"winreg",
]
[[package]]
name = "rocksdb"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
name = "rust-argon2"
version = "0.8.3"
@@ -2060,9 +1973,9 @@ checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27"
[[package]]
name = "slab"
version = "0.4.3"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "slog"
@@ -2500,9 +2413,9 @@ dependencies = [
[[package]]
name = "vcpkg"
version = "0.2.12"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
[[package]]
name = "vec-arena"
@@ -2555,7 +2468,6 @@ dependencies = [
"pageserver",
"postgres",
"postgres-protocol",
"postgres_ffi",
"regex",
"slog",
"slog-async",
@@ -2760,7 +2672,3 @@ dependencies = [
[[package]]
name = "zenith_utils"
version = "0.1.0"
dependencies = [
"thiserror",
"tokio",
]

View File

@@ -1,9 +1,8 @@
use std::fs::{self, File, OpenOptions};
use std::fs::{self, OpenOptions};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::TcpStream;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
@@ -12,12 +11,13 @@ use std::{collections::BTreeMap, path::PathBuf};
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use regex::Regex;
use tar;
use postgres::{Client, NoTls};
use crate::local_env::LocalEnv;
use crate::storage::{PageServerNode, WalProposerNode};
use pageserver::{zenith_repo_dir, ZTimelineId};
use pageserver::ZTimelineId;
//
// ComputeControlPlane
@@ -190,11 +190,11 @@ impl PostgresNode {
);
let port: u16 = CONF_PORT_RE
.captures(config.as_str())
.ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 1"))?
.ok_or(anyhow::Error::msg(err_msg.clone() + " 1"))?
.iter()
.last()
.ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 2"))?
.ok_or_else(|| anyhow::Error::msg(err_msg.clone() + " 3"))?
.ok_or(anyhow::Error::msg(err_msg.clone() + " 2"))?
.ok_or(anyhow::Error::msg(err_msg.clone() + " 3"))?
.as_str()
.parse()
.with_context(|| err_msg)?;
@@ -277,9 +277,7 @@ impl PostgresNode {
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
max_connections = 100\n\
wal_sender_timeout = 0\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
@@ -292,7 +290,7 @@ impl PostgresNode {
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or
// page server yet. But this will do for now.
self.append_conf("postgresql.conf", "wal_keep_size='10TB'\n");
self.append_conf("postgresql.conf", &format!("wal_keep_size='10TB'\n"));
// Connect it to the page server.
@@ -355,7 +353,6 @@ impl PostgresNode {
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.with_context(|| "pg_ctl failed")?;
if !pg_ctl.success() {
@@ -399,14 +396,6 @@ impl PostgresNode {
String::from_utf8(output.stdout).unwrap().trim().to_string()
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- Dump pageserver.log:\n{}", buffer);
}
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
@@ -418,11 +407,7 @@ impl PostgresNode {
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
let result = client.query(sql, &[]);
if result.is_err() {
self.dump_log_file();
}
result.unwrap()
client.query(sql, &[]).unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {
@@ -458,71 +443,8 @@ impl PostgresNode {
}
}
pub fn pg_regress(&self) {
self.safe_psql("postgres", "CREATE DATABASE regression");
let data_dir = zenith_repo_dir();
let regress_run_path = data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
}
pub fn pg_bench(&self, clients: u32, seconds: u32) {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
}
// TODO
pub fn pg_bench() {}
}
impl Drop for PostgresNode {

View File

@@ -15,9 +15,8 @@ use std::process::{Command, Stdio};
use anyhow::Result;
use serde_derive::{Deserialize, Serialize};
use pageserver::zenith_repo_dir;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils;
use walkeeper::xlog_utils;
//
// This data structure represents deserialized zenith config, which should be
@@ -53,6 +52,14 @@ impl LocalEnv {
}
}
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
//
// Initialize a new Zenith repository
//
@@ -101,7 +108,7 @@ pub fn init() -> Result<()> {
// ok, we are good to go
let mut conf = LocalEnv {
repo_path,
repo_path: repo_path.clone(),
pg_distrib_dir,
zenith_distrib_dir,
systemid: 0,
@@ -133,21 +140,14 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
// the repository. Use "tempdir()" or something? Or just create it directly
// in the repo?
let initdb_path = local_env.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
let _initdb = Command::new(initdb_path)
.args(&["-D", "tmp"])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
local_env.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
if !initdb.success() {
anyhow::bail!("initdb failed");
}
println!("initdb succeeded");
// Read control file to extract the LSN and system id
@@ -254,7 +254,7 @@ pub fn test_env(testname: &str) -> LocalEnv {
systemid: 0,
};
init_repo(&mut local_env).expect("could not initialize zenith repository");
local_env
return local_env;
}
// Find the directory where the binaries were put (i.e. target/debug/)
@@ -266,7 +266,7 @@ pub fn cargo_bin_dir() -> PathBuf {
pathbuf.pop();
}
pathbuf
return pathbuf;
}
#[derive(Debug, Clone, Copy)]
@@ -358,7 +358,7 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u6
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
Ok(lsn)
return Ok(lsn);
}
// Find the latest snapshot for a timeline

View File

@@ -13,6 +13,7 @@ use std::time::Duration;
use postgres::{Client, NoTls};
use crate::compute::PostgresNode;
use crate::local_env::LocalEnv;
use pageserver::ZTimelineId;
@@ -55,7 +56,7 @@ impl TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
repopath,
repopath: repopath,
}
}
@@ -72,7 +73,7 @@ impl TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
repopath,
repopath: repopath,
}
}
@@ -88,7 +89,7 @@ impl TestStorageControlPlane {
listen_address: None,
}),
test_done: AtomicBool::new(false),
repopath,
repopath: repopath,
};
cplane.pageserver.start().unwrap();
@@ -110,9 +111,6 @@ impl TestStorageControlPlane {
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
@@ -184,8 +182,7 @@ impl PageServerNode {
.env("RUST_BACKTRACE", "1")
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
if !cmd.status()?.success() {
anyhow::bail!(
@@ -236,7 +233,7 @@ impl PageServerNode {
if !status.success() {
anyhow::bail!("Failed to stop pageserver with pid {}", pid);
} else {
Ok(())
return Ok(());
}
}
@@ -368,6 +365,42 @@ impl Drop for WalProposerNode {
}
}
///////////////////////////////////////////////////////////////////////////////
pub fn regress_check(pg: &PostgresNode) {
pg.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
fs::create_dir_all(regress_run_path.clone()).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("PGHOST", pg.address.ip().to_string())
.env("PGPORT", pg.address.port().to_string())
.env("PGUSER", pg.whoami())
.status()
.expect("pg_regress failed");
}
/// Read a PID file
///
/// This should contain an unsigned integer, but we return it as a String

View File

@@ -50,6 +50,7 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
#[ignore]
fn test_regress() {
let local_env = local_env::test_env("test_regress");
@@ -62,24 +63,7 @@ fn test_regress() {
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap();
node.pg_regress();
}
// Runs pg_bench on a compute node
#[test]
fn pgbench() {
let local_env = local_env::test_env("pgbench");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap();
node.pg_bench(10, 100);
control_plane::storage::regress_check(&node);
}
// Run two postgres instances on one pageserver, on different timelines

View File

@@ -32,14 +32,11 @@ tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
rocksdb = "0.16.0"
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"
thiserror = "1.0"
hex = "0.4.3"
tar = "0.4.33"
parse_duration = "*"
postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -66,7 +66,7 @@ pub fn send_snapshot_tarball(
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = relpath.to_str().unwrap().clone();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
@@ -148,7 +148,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
Ok((relnode, forknum, segno))
return Ok((relnode, forknum, segno));
}
fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
@@ -172,9 +172,9 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
if let Some(fname) = path.strip_prefix("global/") {
let (_relnode, _forknum, _segno) = parse_filename(fname)?;
Ok(())
return Ok(());
} else if let Some(dbpath) = path.strip_prefix("base/") {
let mut s = dbpath.split('/');
let mut s = dbpath.split("/");
let dbnode_str = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
@@ -188,15 +188,15 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
let (_relnode, _forknum, _segno) = parse_filename(fname)?;
Ok(())
return Ok(());
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
// TODO
Err(FilePathError::new("tablespaces not supported"))
return Err(FilePathError::new("tablespaces not supported"));
} else {
Err(FilePathError::new("invalid relation data file name"))
return Err(FilePathError::new("invalid relation data file name"));
}
}
fn is_rel_file_path(path: &str) -> bool {
parse_rel_file_path(path).is_ok()
return parse_rel_file_path(path).is_ok();
}

View File

@@ -3,13 +3,12 @@
//
use log::*;
use parse_duration::parse;
use std::fs::{self, OpenOptions};
use std::fs;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -17,10 +16,18 @@ use daemonize::Daemonize;
use slog::Drain;
use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf};
use pageserver::page_service;
use pageserver::tui;
//use pageserver::walreceiver;
use pageserver::PageServerConf;
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 10;
fn zenith_repo_dir() -> String {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
@@ -46,25 +53,11 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Run in the background"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
.takes_value(true)
.help("Distance from current LSN to perform all wal records cleanup"),
)
.arg(
Arg::with_name("gc_period")
.long("gc_period")
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.get_matches();
let mut conf = PageServerConf {
daemonize: false,
interactive: false,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
};
@@ -85,14 +78,6 @@ fn main() -> Result<()> {
conf.listen_addr = addr.parse()?;
}
if let Some(horizon) = arg_matches.value_of("gc_horizon") {
conf.gc_horizon = horizon.parse()?;
}
if let Some(period) = arg_matches.value_of("gc_period") {
conf.gc_period = parse(period)?;
}
start_pageserver(&conf)
}
@@ -140,7 +125,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
.with_context(|| format!("failed to open {:?}", &log_filename))?;
let daemonize = Daemonize::new()
.pid_file(repodir.join("pageserver.pid"))
.pid_file(repodir.clone().join("pageserver.pid"))
.working_directory(repodir)
.stdout(stdout)
.stderr(stderr);
@@ -154,7 +139,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
// does this for us.
let repodir = zenith_repo_dir();
std::env::set_current_dir(&repodir)?;
info!("Changed current directory to repository in {:?}", &repodir);
info!("Changed current directory to repository in {}", &repodir);
}
let mut threads = Vec::new();
@@ -184,9 +169,9 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
.unwrap();
threads.push(page_server_thread);
if let Some(tui_thread) = tui_thread {
if tui_thread.is_some() {
// The TUI thread exits when the user asks to Quit.
tui_thread.join().unwrap();
tui_thread.unwrap().join().unwrap();
} else {
// In non-interactive mode, wait forever.
for t in threads {
@@ -200,23 +185,19 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
if conf.interactive {
Ok(tui::init_logging())
} else if conf.daemonize {
let log = zenith_repo_dir().join("pageserver.log");
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log)
.map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let log = zenith_repo_dir() + "/pageserver.log";
let log_file = File::create(&log).map_err(|err| {
// We failed to initialize logging, so we can't log this message with error!
eprintln!("Could not create log file {:?}: {}", log, err);
err
})?;
let decorator = slog_term::PlainSyncDecorator::new(log_file);
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = slog::Filter::new(drain, |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug) {
return true;
}
false
return false;
});
let drain = std::sync::Mutex::new(drain).fuse();
let logger = slog::Logger::root(drain, slog::o!());
@@ -234,7 +215,7 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
{
return true;
}
false
return false;
})
.fuse();
let logger = slog::Logger::root(drain, slog::o!());

View File

@@ -1,8 +1,6 @@
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
pub mod basebackup;
pub mod page_cache;
@@ -21,34 +19,9 @@ pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub listen_addr: SocketAddr,
pub gc_horizon: u64,
pub gc_period: Duration,
}
/// Zenith Timeline ID is a 128-bit random ID.
///
/// Zenith timeline IDs are different from PostgreSQL timeline
/// IDs. They serve a similar purpose though: they differentiate
/// between different "histories" of the same cluster. However,
/// PostgreSQL timeline IDs are a bit cumbersome, because they are only
/// 32-bits wide, and they must be in ascending order in any given
/// timeline history. Those limitations mean that we cannot generate a
/// new PostgreSQL timeline ID by just generating a random number. And
/// that in turn is problematic for the "pull/push" workflow, where you
/// have a local copy of a zenith repository, and you periodically sync
/// the local changes with a remote server. When you work "detached"
/// from the remote server, you cannot create a PostgreSQL timeline ID
/// that's guaranteed to be different from all existing timelines in
/// the remote server. For example, if two people are having a clone of
/// the repository on their laptops, and they both create a new branch
/// with different name. What timeline ID would they assign to their
/// branches? If they pick the same one, and later try to push the
/// branches to the same remote server, they will get mixed up.
///
/// To avoid those issues, Zenith has its own concept of timelines that
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
// Zenith Timeline ID is a 32-byte random ID.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ZTimelineId([u8; 16]);
@@ -85,11 +58,3 @@ impl fmt::Display for ZTimelineId {
f.write_str(&hex::encode(self.0))
}
}
pub fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}

View File

@@ -1,28 +1,29 @@
//
// Page Cache holds all the different page versions and WAL records
//
// The Page Cache is currenusing RocksDB for storing wal records and full page images, keyed by the RelFileNode, blocknumber, and the LSN.
// The Page Cache is a BTreeMap, keyed by the RelFileNode an blocknumber, and the LSN.
// The BTreeMap is protected by a Mutex, and each cache entry is protected by another
// per-entry mutex.
//
use crate::restore_local_repo::restore_timeline;
use crate::ZTimelineId;
use crate::{walredo, zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::{walredo, PageServerConf};
use anyhow::bail;
use bytes::Bytes;
use core::ops::Bound::Included;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rocksdb;
use std::cmp::min;
use std::collections::HashMap;
use rand::Rng;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU64};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -30,15 +31,11 @@ static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache {
shared: Mutex<PageCacheShared>,
// RocksDB handle
db: rocksdb::DB,
// Channel for communicating with the WAL redo process here.
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
// Allows .await on the arrival of a particular LSN.
seqwait_lsn: SeqWait,
valid_lsn_condvar: Condvar,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
@@ -82,6 +79,16 @@ impl AddAssign for PageCacheStats {
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, Arc<CacheEntry>>,
// Relation n_blocks cache
//
// This hashtable should be updated together with the pagecache. Now it is
// accessed unreasonably often through the smgr_nblocks(). It is better to just
// cache it in postgres smgr and ask only on restart.
relsize_cache: HashMap<RelTag, u32>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
@@ -125,7 +132,7 @@ pub fn get_or_restore_pagecache(
match pcaches.get(&timelineid) {
Some(pcache) => Ok(pcache.clone()),
None => {
let pcache = init_page_cache(conf, timelineid);
let pcache = init_page_cache();
restore_timeline(conf, &pcache, timelineid)?;
@@ -145,48 +152,25 @@ pub fn get_or_restore_pagecache(
walredo::wal_redo_main(&conf_copy, timelineid);
})
.unwrap();
if conf.gc_horizon != 0 {
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
gc_thread_main(&conf_copy, timelineid);
})
.unwrap();
}
Ok(result)
return Ok(result);
}
}
}
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();
}
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
let path = zenith_repo_dir().join(timelineid.to_string());
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.create_missing_column_families(true);
rocksdb::DB::open_cf(&opts, &path, &[rocksdb::DEFAULT_COLUMN_FAMILY_NAME]).unwrap()
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
fn init_page_cache() -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();
PageCache {
db: open_rocksdb(&conf, timelineid),
shared: Mutex::new(PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
}),
seqwait_lsn: SeqWait::new(0),
valid_lsn_condvar: Condvar::new(),
walredo_sender: s,
walredo_receiver: r,
@@ -215,25 +199,12 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64,
}
impl CacheKey {
pub fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn);
}
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: buf.get_u64(),
}
}
}
pub struct CacheEntry {
pub key: CacheKey,
@@ -253,47 +224,21 @@ pub struct CacheEntryContent {
pub apply_pending: bool,
}
impl CacheEntryContent {
pub fn pack(&self, buf: &mut BytesMut) {
if let Some(image) = &self.page_image {
buf.put_u8(1);
buf.put_u16(image.len() as u16);
buf.put_slice(&image[..]);
} else if let Some(rec) = &self.wal_record {
buf.put_u8(0);
rec.pack(buf);
}
}
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
if buf.get_u8() == 1 {
let mut dst = vec![0u8; buf.get_u16() as usize];
buf.copy_to_slice(&mut dst);
CacheEntryContent {
page_image: Some(Bytes::from(dst)),
wal_record: None,
apply_pending: false,
}
} else {
CacheEntryContent {
page_image: None,
wal_record: Some(WALRecord::unpack(buf)),
apply_pending: false,
}
}
}
}
impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
fn new(key: CacheKey) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(content),
content: Mutex::new(CacheEntryContent {
page_image: None,
wal_record: None,
apply_pending: false,
}),
walredo_condvar: Condvar::new(),
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
pub dbnode: u32,
@@ -301,278 +246,162 @@ pub struct RelTag {
pub forknum: u8,
}
impl RelTag {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
}
pub fn unpack(buf: &mut BytesMut) -> RelTag {
RelTag {
spcnode: buf.get_u32(),
dbnode: buf.get_u32(),
relnode: buf.get_u32(),
forknum: buf.get_u32() as u8,
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub struct BufferTag {
pub rel: RelTag,
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u8,
pub blknum: u32,
}
impl BufferTag {
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);
}
pub fn unpack(buf: &mut BytesMut) -> BufferTag {
BufferTag {
rel: RelTag::unpack(buf),
blknum: buf.get_u32(),
}
}
}
#[derive(Clone)]
pub struct WALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub will_init: bool,
pub truncate: bool,
pub rec: Bytes,
// Remember the offset of main_data in rec,
// so that we don't have to parse the record again.
// If record has no main_data, this offset equals rec.len().
pub main_data_offset: u32,
}
impl WALRecord {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u64(self.lsn);
buf.put_u8(self.will_init as u8);
buf.put_u8(self.truncate as u8);
buf.put_u32(self.main_data_offset);
buf.put_u32(self.rec.len() as u32);
buf.put_slice(&self.rec[..]);
}
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
let lsn = buf.get_u64();
let will_init = buf.get_u8() != 0;
let truncate = buf.get_u8() != 0;
let main_data_offset = buf.get_u32();
let mut dst = vec![0u8; buf.get_u32() as usize];
buf.copy_to_slice(&mut dst);
WALRecord {
lsn,
will_init,
truncate,
rec: Bytes::from(dst),
main_data_offset,
}
}
pub main_data_offset: usize,
}
// Public interface functions
impl PageCache {
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self
.db
.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
.unwrap();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
if last_lsn > conf.gc_horizon {
let horizon = last_lsn - conf.gc_horizon;
let mut maxkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: u64::MAX,
};
loop {
maxbuf.clear();
maxkey.pack(&mut maxbuf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = 0;
// reconstruct most recent page version
if content.wal_record.is_some() {
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
}
maxbuf.clear();
maxkey.pack(&mut maxbuf);
if last_lsn > horizon {
// locate most recent record before horizon
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&maxbuf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
}
}
}
// remove records prior to horizon
minbuf.clear();
minkey.pack(&mut minbuf);
trace!("Delete records in range {:?}..{:?}", minkey, maxkey);
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
maxkey = minkey;
} else {
break;
}
}
}
}
}
fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));
let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
Ok(page_img)
}
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
self.seqwait_lsn
.wait_for_timeout(lsn, TIMEOUT)
.await
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
)
})?;
Ok(())
}
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
pub fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
//TODO should we use last_valid_lsn here instead of maxvalue?
if lsn == 0
{
lsn = self.last_valid_lsn.load(Ordering::Acquire);
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
lsn
);
lsn = 0xffff_ffff_ffff_eeee;
}
else
{
self.wait_lsn(lsn).await?;
}
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let entry_rc: Arc<CacheEntry>;
{
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
// There is a a race at postgres instance start
// when we request a page before walsender established connection
// and was able to stream the page. Just don't wait and return what we have.
if req_lsn == 0
{
trace!(
"walsender hasn't started yet. Don't wait. last_valid_lsn {}, requested {}",
shared.last_valid_lsn, lsn);
}
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let entry_opt = iter.next();
if req_lsn != 0
{
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!(
"not caught up yet: {}, requested {}",
shared.last_valid_lsn,
lsn
);
let wait_result = self
.valid_lsn_condvar
.wait_timeout(shared, TIMEOUT)
.unwrap();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
shared = wait_result.0;
if wait_result.1.timed_out() {
bail!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
);
}
}
}
if waited {
trace!("caught up now, continuing");
}
if lsn < shared.first_valid_lsn {
bail!(
"LSN {:X}/{:X} has already been removed",
lsn >> 32,
lsn & 0xffff_ffff
);
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
}
let (k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
// Lock the cache entry and dig the page image out of it.
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
{
let mut entry_content = entry_rc.content.lock().unwrap();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
if !entry_content.apply_pending {
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
}
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
bail!("could not apply WAL to reconstruct page image");
}
};
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
@@ -582,14 +411,14 @@ impl PageCache {
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.spcnode,
tag.dbnode,
tag.relnode,
tag.forknum,
tag.blknum
);
Ok(page_img)
return Ok(page_img);
}
//
@@ -600,42 +429,38 @@ impl PageCache {
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
// Scan the BTreeMap backwards, starting from the given entry.
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
let minkey = CacheKey {
tag: BufferTag {
rel: entry.key.tag.rel,
blknum: 0,
},
tag: entry.key.tag,
lsn: 0,
};
let maxkey = CacheKey {
tag: entry.key.tag,
lsn: entry.key.lsn,
};
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
entry.key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
// the last entry in the range should be the CacheEntry we were given
//let _last_entry = entries.next_back();
//assert!(last_entry == entry);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_k, v) in iter {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(img) = &content.page_image {
for (_key, e) in entries.rev() {
let e = e.content.lock().unwrap();
if let Some(img) = &e.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &content.wal_record {
} else if let Some(rec) = &e.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
@@ -648,7 +473,7 @@ impl PageCache {
}
records.reverse();
(base_img, records)
return (base_img, records);
}
//
@@ -658,53 +483,36 @@ impl PageCache {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
dbnode: tag.dbnode,
relnode: tag.relnode,
forknum: tag.forknum,
};
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
if tag.blknum >= *rel_entry {
*rel_entry = tag.blknum + 1;
}
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//trace!("put_wal_record lsn: {}", lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Adds a relation-wide WAL record (like truncate) to the page cache,
// associating it with all pages started with specified block number
//
pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
let mut key = CacheKey { tag, lsn: rec.lsn };
let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?;
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
// set new relation size
trace!("Truncate relation {:?}", tag);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
if !oldentry.is_none() {
error!(
"overwriting WAL record with LSN {:X}/{:X} in page cache",
lsn >> 32,
lsn & 0xffffffff
);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
@@ -712,22 +520,20 @@ impl PageCache {
//
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
wal_record: None,
apply_pending: false,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
@@ -740,7 +546,7 @@ impl PageCache {
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
@@ -766,7 +572,7 @@ impl PageCache {
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
@@ -806,85 +612,83 @@ impl PageCache {
pub fn get_last_valid_lsn(&self) -> u64 {
let shared = self.shared.lock().unwrap();
shared.last_record_lsn
return shared.last_record_lsn;
}
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
if lsn != u64::MAX {
self.wait_lsn(lsn).await?;
}
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn _test_get_page_at_lsn(&self) {
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
let mut key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
let mut tag: Option<BufferTag> = None;
loop {
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(rec) = &content.wal_record {
if rec.truncate {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
continue;
}
break;
}
}
let relsize = tag.blknum + 1;
trace!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
{
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
info!("page cache is empty");
return;
}
// Find nth entry in the map, where n is picked at random
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
}
i += 1;
}
}
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
}
Err(error) => {
error!("GetPage@LSN failed: {}", error);
}
break;
}
trace!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<bool> {
self.wait_lsn(lsn).await?;
/// Remember a relation's size in blocks.
///
/// If 'to' is larger than the previously remembered size, the remembered size is increased to 'to'.
/// But if it's smaller, there is no change.
pub fn relsize_inc(&self, rel: &RelTag, to: u32) {
// FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN.
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
let key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, _v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
trace!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
if to >= *entry {
*entry = to;
}
trace!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(&self, rel: &RelTag) -> bool {
let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}
pub fn get_stats(&self) -> PageCacheStats {

View File

@@ -18,7 +18,6 @@ use std::io;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
@@ -51,8 +50,12 @@ enum FeMessage {
// All that messages are actually CopyData from libpq point of view.
//
ZenithExistsRequest(ZenithRequest),
ZenithTruncRequest(ZenithRequest),
ZenithUnlinkRequest(ZenithRequest),
ZenithNblocksRequest(ZenithRequest),
ZenithReadRequest(ZenithRequest),
ZenithCreateRequest(ZenithRequest),
ZenithExtendRequest(ZenithRequest),
}
#[derive(Debug)]
@@ -186,11 +189,12 @@ fn read_null_terminated(buf: &mut Bytes) -> Result<Bytes> {
}
result.put_u8(byte);
}
Ok(result.freeze())
return Ok(result.freeze());
}
impl FeParseMessage {
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
pub fn parse(body: Bytes) -> Result<FeMessage> {
let mut buf = body.clone();
let _pstmt_name = read_null_terminated(&mut buf)?;
let query_string = read_null_terminated(&mut buf)?;
let nparams = buf.get_i16();
@@ -200,7 +204,7 @@ impl FeParseMessage {
// now, just ignore the statement name, assuming that the client never
// uses more than one prepared statement at a time.
/*
if !pstmt_name.is_empty() {
if pstmt_name.len() != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"named prepared statements not implemented in Parse",
@@ -226,13 +230,14 @@ struct FeDescribeMessage {
}
impl FeDescribeMessage {
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
pub fn parse(body: Bytes) -> Result<FeMessage> {
let mut buf = body.clone();
let kind = buf.get_u8();
let _pstmt_name = read_null_terminated(&mut buf)?;
// FIXME: see FeParseMessage::parse
/*
if !pstmt_name.is_empty() {
if pstmt_name.len() != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"named prepared statements not implemented in Describe",
@@ -259,11 +264,12 @@ struct FeExecuteMessage {
}
impl FeExecuteMessage {
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
pub fn parse(body: Bytes) -> Result<FeMessage> {
let mut buf = body.clone();
let portal_name = read_null_terminated(&mut buf)?;
let maxrows = buf.get_i32();
if !portal_name.is_empty() {
if portal_name.len() != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"named portals not implemented",
@@ -286,11 +292,12 @@ impl FeExecuteMessage {
struct FeBindMessage {}
impl FeBindMessage {
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
pub fn parse(body: Bytes) -> Result<FeMessage> {
let mut buf = body.clone();
let portal_name = read_null_terminated(&mut buf)?;
let _pstmt_name = read_null_terminated(&mut buf)?;
if !portal_name.is_empty() {
if portal_name.len() != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"named portals not implemented",
@@ -299,7 +306,7 @@ impl FeBindMessage {
// FIXME: see FeParseMessage::parse
/*
if !pstmt_name.is_empty() {
if pstmt_name.len() != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"named prepared statements not implemented",
@@ -316,7 +323,8 @@ impl FeBindMessage {
struct FeCloseMessage {}
impl FeCloseMessage {
pub fn parse(mut buf: Bytes) -> Result<FeMessage> {
pub fn parse(body: Bytes) -> Result<FeMessage> {
let mut buf = body.clone();
let _kind = buf.get_u8();
let _pstmt_or_portal_name = read_null_terminated(&mut buf)?;
@@ -357,7 +365,7 @@ impl FeMessage {
let mut body = body.freeze();
match tag {
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body }))),
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body: body }))),
b'P' => Ok(Some(FeParseMessage::parse(body)?)),
b'D' => Ok(Some(FeDescribeMessage::parse(body)?)),
b'E' => Ok(Some(FeExecuteMessage::parse(body)?)),
@@ -380,8 +388,12 @@ impl FeMessage {
// serialization.
match smgr_tag {
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))),
3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))),
6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
@@ -418,13 +430,12 @@ pub fn thread_main(conf: &PageServerConf) {
let runtime_ref = Arc::new(runtime);
runtime_ref.block_on(async {
runtime_ref.clone().block_on(async {
let listener = TcpListener::bind(conf.listen_addr).await.unwrap();
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref);
task::spawn(async move {
@@ -529,7 +540,7 @@ impl Connection {
BeMessage::RowDescription => {
// XXX
let b = Bytes::from("data\0");
let mut b = Bytes::from("data\0");
self.stream.write_u8(b'T').await?;
self.stream
@@ -537,7 +548,7 @@ impl Connection {
.await?;
self.stream.write_i16(1).await?;
self.stream.write_all(&b).await?;
self.stream.write_all(&mut b).await?;
self.stream.write_i32(0).await?; /* table oid */
self.stream.write_i16(0).await?; /* attnum */
self.stream.write_i32(25).await?; /* TEXTOID */
@@ -549,34 +560,34 @@ impl Connection {
// XXX: accept some text data
BeMessage::DataRow => {
// XXX
let b = Bytes::from("hello world");
let mut b = Bytes::from("hello world");
self.stream.write_u8(b'D').await?;
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
self.stream.write_i16(1).await?;
self.stream.write_i32(b.len() as i32).await?;
self.stream.write_all(&b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::ControlFile => {
// TODO pass checkpoint and xid info in this message
let b = Bytes::from("hello pg_control");
let mut b = Bytes::from("hello pg_control");
self.stream.write_u8(b'D').await?;
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
self.stream.write_i16(1).await?;
self.stream.write_i32(b.len() as i32).await?;
self.stream.write_all(&b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::CommandComplete => {
let b = Bytes::from("SELECT 1\0");
let mut b = Bytes::from("SELECT 1\0");
self.stream.write_u8(b'C').await?;
self.stream.write_i32(4 + b.len() as i32).await?;
self.stream.write_all(&b).await?;
self.stream.write_all(&mut b).await?;
}
BeMessage::ZenithStatusResponse(resp) => {
@@ -603,7 +614,7 @@ impl Connection {
self.stream.write_u8(102).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_u32(resp.n_blocks).await?;
self.stream.write_all(&resp.page.clone()).await?;
self.stream.write_all(&mut resp.page.clone()).await?;
}
}
@@ -619,15 +630,15 @@ impl Connection {
let mut unnamed_query_string = Bytes::new();
loop {
let msg = self.read_message().await?;
trace!("got message {:?}", msg);
info!("got message {:?}", msg);
match msg {
Some(FeMessage::StartupMessage(m)) => {
trace!("got message {:?}", m);
match m.kind {
StartupRequestCode::NegotiateGss | StartupRequestCode::NegotiateSsl => {
let b = Bytes::from("N");
self.stream.write_all(&b).await?;
let mut b = Bytes::from("N");
self.stream.write_all(&mut b).await?;
self.stream.flush().await?;
}
StartupRequestCode::Normal => {
@@ -719,7 +730,7 @@ impl Connection {
let caps = re.captures(&query_str);
let caps = caps.unwrap();
let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str()).unwrap();
let timelineid = ZTimelineId::from_str(caps.get(1).unwrap().as_str().clone()).unwrap();
let connstr: String = String::from(caps.get(2).unwrap().as_str());
// Check that the timeline exists
@@ -782,7 +793,7 @@ impl Connection {
let message = self.read_message().await?;
if let Some(m) = &message {
trace!("query({:?}): {:?}", timelineid, m);
info!("query({:?}): {:?}", timelineid, m);
};
if message.is_none() {
@@ -799,7 +810,7 @@ impl Connection {
forknum: req.forknum,
};
let exist = pcache.relsize_exist(&tag, req.lsn).await.unwrap_or(false);
let exist = pcache.relsize_exist(&tag);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: exist,
@@ -807,6 +818,20 @@ impl Connection {
}))
.await?
}
Some(FeMessage::ZenithTruncRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithUnlinkRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithNblocksRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
@@ -815,7 +840,7 @@ impl Connection {
forknum: req.forknum,
};
let n_blocks = pcache.relsize_get(&tag, req.lsn).await.unwrap_or(0);
let n_blocks = pcache.relsize_get(&tag);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
@@ -825,16 +850,14 @@ impl Connection {
}
Some(FeMessage::ZenithReadRequest(req)) => {
let buf_tag = page_cache::BufferTag {
rel: page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
},
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
blknum: req.blkno,
};
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn).await {
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
@@ -853,6 +876,38 @@ impl Connection {
self.write_message(&msg).await?
}
Some(FeMessage::ZenithCreateRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, 0);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithExtendRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
};
pcache.relsize_inc(&tag, req.blkno + 1);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0,
}))
.await?
}
_ => {}
}
}
@@ -891,10 +946,13 @@ impl Connection {
let f_tar2 = async {
let joinres = f_tar.await;
if let Err(joinreserr) = joinres {
return Err(io::Error::new(io::ErrorKind::InvalidData, joinreserr));
if joinres.is_err() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
joinres.unwrap_err(),
));
}
joinres.unwrap()
return joinres.unwrap();
};
let f_pump = async move {
@@ -903,12 +961,12 @@ impl Connection {
if buf.is_none() {
break;
}
let buf = buf.unwrap();
let mut buf = buf.unwrap();
// CopyData
stream.write_u8(b'd').await?;
stream.write_u32((4 + buf.len()) as u32).await?;
stream.write_all(&buf).await?;
stream.write_all(&mut buf).await?;
trace!("CopyData sent for {} bytes!", buf.len());
// FIXME: flush isn't really required, but makes it easier
@@ -929,7 +987,7 @@ impl Connection {
// FIXME: I'm getting an error from the tokio copyout driver without this.
// I think it happens when the CommandComplete, CloseComplete and ReadyForQuery
// are sent in the same TCP packet as the CopyDone. I don't understand why.
thread::sleep(Duration::from_secs(1));
thread::sleep(std::time::Duration::from_secs(1));
Ok(())
}

View File

@@ -44,22 +44,10 @@ pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4;
// From pg_control.h and rmgrlist.h
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
pub const RM_CLOG_ID: u8 = 3;
pub const RM_DBASE_ID: u8 = 4;
pub const RM_TBLSPC_ID: u8 = 5;
// pub const RM_MULTIXACT_ID:u8 = 6;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
// from dbcommands_xlog.h
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;

View File

@@ -29,11 +29,9 @@ use bytes::Bytes;
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::page_cache::RelTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::xlog_utils::*;
// From pg_tablespace_d.h
//
@@ -188,9 +186,10 @@ fn restore_relfile(
// Does it look like a relation file?
let p = parse_relfilename(path.file_name().unwrap().to_str().unwrap());
if let Err(e) = p {
if p.is_err() {
let e = p.unwrap_err();
warn!("unrecognized file in snapshot: {:?} ({})", path, e);
return Err(e.into());
return Err(e)?;
}
let (relnode, forknum, segno) = p.unwrap();
@@ -203,14 +202,12 @@ fn restore_relfile(
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
},
blknum,
let tag = page_cache::BufferTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
blknum: blknum,
};
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
/*
@@ -236,6 +233,14 @@ fn restore_relfile(
blknum += 1;
}
let tag = page_cache::RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: relnode,
forknum: forknum as u8,
};
pcache.relsize_inc(&tag, blknum);
Ok(())
}
@@ -249,7 +254,7 @@ fn restore_wal(
) -> Result<()> {
let walpath = format!("timelines/{}/wal", timeline);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024);
let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024);
@@ -261,7 +266,7 @@ fn restore_wal(
// It could be as .partial
if !PathBuf::from(&path).exists() {
path += ".partial";
path = path + ".partial";
}
// Slurp the WAL file
@@ -302,21 +307,18 @@ fn restore_wal(
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn,
lsn: lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
main_data_offset: decoded.main_data_offset,
};
pcache.put_wal_record(tag, rec);
@@ -345,6 +347,59 @@ fn restore_wal(
Ok(())
}
// FIXME: copied from xlog_utils.rs
pub const XLOG_FNAME_LEN: usize = 24;
pub type XLogRecPtr = u64;
pub type XLogSegNo = u64;
pub type TimeLineID = u32;
#[allow(non_snake_case)]
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
}
#[allow(non_snake_case)]
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
return xlogptr / wal_segsz_bytes as u64;
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
return format!(
"{:>08X}{:>08X}{:>08X}",
tli,
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
);
}
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
}
#[allow(non_snake_case)]
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
if let Some(basefname) = fname.strip_suffix(".partial") {
IsXLogFileName(basefname)
} else {
false
}
}
#[derive(Debug, Clone)]
struct FilePathError {
msg: String,
@@ -430,5 +485,5 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
Ok((relnode, forknum, segno))
return Ok((relnode, forknum, segno));
}

View File

@@ -38,9 +38,12 @@ pub fn restore_main(conf: &PageServerConf) {
let result = restore_chunk(conf).await;
match result {
Ok(_) => {}
Ok(_) => {
return;
}
Err(err) => {
error!("S3 error: {}", err);
return;
}
}
});
@@ -196,7 +199,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode: u32 = relnode_str.parse()?;
let relnode = u32::from_str_radix(relnode_str, 10)?;
let forkname_match = caps.name("forkname");
let forkname = if forkname_match.is_none() {
@@ -210,14 +213,14 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
let segno = if segno_match.is_none() {
0
} else {
segno_match.unwrap().as_str().parse::<u32>()?
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
let lsn_hi: u64 = caps.name("lsnhi").unwrap().as_str().parse()?;
let lsn_lo: u64 = caps.name("lsnlo").unwrap().as_str().parse()?;
let lsn_hi = u64::from_str_radix(caps.name("lsnhi").unwrap().as_str(), 16)?;
let lsn_lo = u64::from_str_radix(caps.name("lsnlo").unwrap().as_str(), 16)?;
let lsn = lsn_hi << 32 | lsn_lo;
Ok((relnode, forknum, segno, lsn))
return Ok((relnode, forknum, segno, lsn));
}
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
@@ -241,20 +244,20 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
if let Some(fname) = path.strip_prefix("global/") {
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
Ok(ParsedBaseImageFileName {
return Ok(ParsedBaseImageFileName {
spcnode: GLOBALTABLESPACE_OID,
dbnode: 0,
relnode,
forknum,
segno,
lsn,
})
});
} else if let Some(dbpath) = path.strip_prefix("base/") {
let mut s = dbpath.split("/");
let dbnode_str = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let dbnode: u32 = dbnode_str.parse()?;
let dbnode = u32::from_str_radix(dbnode_str, 10)?;
let fname = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
@@ -264,19 +267,19 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
Ok(ParsedBaseImageFileName {
return Ok(ParsedBaseImageFileName {
spcnode: DEFAULTTABLESPACE_OID,
dbnode,
relnode,
forknum,
segno,
lsn,
})
});
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
// TODO
Err(FilePathError::new("tablespaces not supported"))
return Err(FilePathError::new("tablespaces not supported"));
} else {
Err(FilePathError::new("invalid relation data file name"))
return Err(FilePathError::new("invalid relation data file name"));
}
}
@@ -306,12 +309,10 @@ async fn slurp_base_file(
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
rel: page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
},
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum,
};

View File

@@ -31,7 +31,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
{
return true;
}
false
return false;
})
.fuse();
@@ -41,7 +41,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
{
return true;
}
false
return false;
})
.fuse();
@@ -52,7 +52,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
{
return true;
}
false
return false;
})
.fuse();
@@ -65,7 +65,7 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
{
return true;
}
false
return false;
})
.fuse();
@@ -84,11 +84,11 @@ pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
return true;
}
false
return false;
})
.fuse();
let logger = slog::Logger::root(drain, slog::o!());
slog_scope::set_global_logger(logger)
return slog_scope::set_global_logger(logger);
}
pub fn ui_main() -> Result<(), Box<dyn Error>> {

View File

@@ -76,8 +76,8 @@ impl Events {
};
Events {
rx,
input_handle,
ignore_exit_key,
input_handle,
tick_handle,
}
}

View File

@@ -51,7 +51,7 @@ impl Drain for TuiLogger {
events.pop_back();
}
Ok(())
return Ok(());
}
}

View File

@@ -3,7 +3,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::cmp::min;
use thiserror::Error;
use std::str;
const XLOG_BLCKSZ: u32 = 8192;
@@ -228,7 +227,7 @@ impl WalStreamDecoder {
// FIXME: check that hdr.xlp_rem_len matches self.contlen
//println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len);
hdr
return hdr;
}
#[allow(non_snake_case)]
@@ -240,7 +239,7 @@ impl WalStreamDecoder {
xlp_xlog_blcksz: self.inputbuf.get_u32_le(),
};
hdr
return hdr;
}
}
@@ -329,8 +328,6 @@ impl DecodedBkpBlock {
const SizeOfXLogRecord: u32 = 24;
pub struct DecodedWALRecord {
pub xl_info: u8,
pub xl_rmid: u8,
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>,
@@ -353,43 +350,14 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID
return xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID;
}
pub type Oid = u32;
pub type BlockNumber = u32;
pub const MAIN_FORKNUM: u8 = 0;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
pub struct RelFileNode {
pub spcnode: Oid, /* tablespace */
pub dbnode: Oid, /* database */
pub relnode: Oid, /* relation */
}
#[repr(C)]
#[derive(Debug)]
pub struct XlSmgrTruncate {
pub blkno: BlockNumber,
pub rnode: RelFileNode,
pub flags: u32,
}
pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance((SizeOfXLogRecord + 2) as usize);
XlSmgrTruncate {
blkno: buf.get_u32_le(),
rnode: RelFileNode {
spcnode: buf.get_u32_le(), /* tablespace */
dbnode: buf.get_u32_le(), /* database */
relnode: buf.get_u32_le(), /* relation */
},
flags: buf.get_u32_le(),
}
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
}
//
@@ -411,13 +379,13 @@ pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
let mut buf = record.clone();
let mut buf = rec.clone();
// 1. Parse XLogRecord struct
@@ -679,46 +647,10 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
//TODO parse abort record to extract subtrans entries
}
}
else if xl_rmid == pg_constants::RM_DBASE_ID
{
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_DBASE_CREATE
{
//buf points to main_data
let db_id = buf.get_u32_le();
let tablespace_id = buf.get_u32_le();
let src_db_id = buf.get_u32_le();
let src_tablespace_id = buf.get_u32_le();
trace!("XLOG_DBASE_CREATE db_id {} src_db_id {}", db_id, src_db_id);
// in postgres it is implemented as copydir
// we need to copy all pages in page_cache
}
else
{
trace!("XLOG_DBASE_DROP is not handled yet");
}
}
else if xl_rmid == pg_constants::RM_TBLSPC_ID
{
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_TBLSPC_CREATE
{
//buf points to main_data
let ts_id = buf.get_u32_le();
let ts_path = str::from_utf8(&buf).unwrap();
trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path);
}
else
{
trace!("XLOG_TBLSPC_DROP is not handled yet");
}
}
DecodedWALRecord {
xl_info,
xl_rmid,
record,
record: rec,
blocks,
main_data_offset,
main_data_offset: main_data_offset,
}
}

View File

@@ -7,15 +7,13 @@
//!
use crate::page_cache;
use crate::page_cache::{BufferTag, RelTag};
use crate::pg_constants;
use crate::waldecoder::*;
use crate::page_cache::BufferTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::Error;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::xlog_utils::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::collections::HashMap;
@@ -224,51 +222,22 @@ async fn walreceiver_main(
// so having multiple copies of it doesn't cost that much)
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
truncate: false,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
main_data_offset: decoded.main_data_offset,
};
pcache.put_wal_record(tag, rec);
}
// include truncate wal record in all pages
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = decode_truncate_record(&decoded);
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
let tag = BufferTag {
rel: RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: MAIN_FORKNUM,
},
blknum: truncate.blkno,
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: false,
truncate: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_rel_wal_record(tag, rec).await?;
}
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_record_lsn(lsn);
@@ -374,6 +343,62 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result<Identify
}
}
pub const XLOG_FNAME_LEN: usize = 24;
pub const XLOG_BLCKSZ: usize = 8192;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLOG_PAGE_MAGIC: u16 = 0xD109;
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = XLP_REM_LEN_OFFS + 4 + 4;
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4;
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = XLOG_RECORD_CRC_OFFS + 4;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
pub type TimestampTz = u64;
pub type XLogSegNo = u64;
#[allow(non_snake_case)]
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
}
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
}
#[allow(non_snake_case)]
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
return xlogptr / wal_segsz_bytes as u64;
}
#[allow(non_snake_case)]
pub fn XLogSegNoOffsetToRecPtr(
segno: XLogSegNo,
offset: u32,
wal_segsz_bytes: usize,
) -> XLogRecPtr {
return segno * (wal_segsz_bytes as u64) + (offset as u64);
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
return format!(
"{:>08X}{:>08X}{:>08X}",
tli,
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
);
}
#[allow(non_snake_case)]
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
}
fn write_wal_file(
startpos: XLogRecPtr,
timeline: ZTimelineId,
@@ -384,7 +409,7 @@ fn write_wal_file(
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
const ZERO_BLOCK: &'static [u8] = &[0u8; XLOG_BLCKSZ];
let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline));

View File

@@ -18,10 +18,7 @@ use log::*;
use std::assert;
use std::cell::RefCell;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
@@ -69,28 +66,21 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
let _guard = runtime.enter();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
}
info!("WAL redo postgres started");
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100000 {
for _i in 1..100 {
let request = walredo_channel_receiver.recv().unwrap();
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
// On error, kill the process.
break;
}
}
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
let mut child = process.child;
@@ -163,7 +153,6 @@ fn handle_apply_request(
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
assert!(entry.apply_pending);
entry.apply_pending = false;
let nrecords = records.len();
@@ -171,7 +160,7 @@ fn handle_apply_request(
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
@@ -213,7 +202,7 @@ fn handle_apply_request(
let result;
trace!(
debug!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
@@ -226,13 +215,16 @@ fn handle_apply_request(
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
pcache
.num_page_images
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
result = Ok(());
}
// Wake up the requester, whether the operation succeeded or not.
entry_rc.walredo_condvar.notify_all();
result
return result;
}
struct WalRedoProcess {
@@ -266,14 +258,8 @@ impl WalRedoProcess {
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap()
);
} else {
// Limit shared cache for wal-redo-postres
let mut config = OpenOptions::new()
.append(true)
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
config.write(b"shared_buffers=128kB\n")?;
config.write(b"fsync=off\n")?;
}
// Start postgres itself
let mut child = Command::new("postgres")
.arg("--wal-redo")
@@ -304,7 +290,7 @@ impl WalRedoProcess {
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
debug!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
@@ -331,7 +317,7 @@ impl WalRedoProcess {
) -> Result<Bytes, Error> {
let mut stdin = self.stdin.borrow_mut();
let mut stdout = self.stdout.borrow_mut();
runtime.block_on(async {
return runtime.block_on(async {
//
// This async block sends all the commands to the process.
//
@@ -394,7 +380,7 @@ impl WalRedoProcess {
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
})
});
}
}
@@ -402,13 +388,17 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'B');
buf.put_u8('B' as u8);
buf.put_u32(len as u32);
tag.pack(&mut buf);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
assert!(buf.len() == 1 + len);
buf.freeze()
return buf.freeze();
}
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
@@ -417,39 +407,47 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
let len = 4 + 5 * 4 + base_img.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'P');
buf.put_u8('P' as u8);
buf.put_u32(len as u32);
tag.pack(&mut buf);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
buf.put(base_img);
assert!(buf.len() == 1 + len);
buf.freeze()
return buf.freeze();
}
fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
let len = 4 + 8 + rec.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u8('A' as u8);
buf.put_u32(len as u32);
buf.put_u64(endlsn);
buf.put(rec);
assert!(buf.len() == 1 + len);
buf.freeze()
return buf.freeze();
}
fn build_get_page_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'G');
buf.put_u8('G' as u8);
buf.put_u32(len as u32);
tag.pack(&mut buf);
buf.put_u32(tag.spcnode);
buf.put_u32(tag.dbnode);
buf.put_u32(tag.relnode);
buf.put_u32(tag.forknum as u32);
buf.put_u32(tag.blknum);
assert!(buf.len() == 1 + len);
buf.freeze()
return buf.freeze();
}

View File

@@ -14,7 +14,6 @@ byteorder = "1.4.3"
anyhow = "1.0"
crc32c = "0.6.0"
hex = "0.4.3"
log = "0.4.14"
[build-dependencies]
bindgen = "0.57"
bindgen = "0.53.1"

View File

@@ -1,3 +0,0 @@
This module contains utility functions for interacting with PostgreSQL
file formats.

View File

@@ -3,8 +3,6 @@
#![allow(non_snake_case)]
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod xlog_utils;
use bytes::{Buf, Bytes, BytesMut};
// sizeof(ControlFileData)
@@ -20,13 +18,13 @@ impl ControlFileData {
controlfile =
unsafe { std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) };
controlfile
return controlfile;
}
}
pub fn decode_pg_control(mut buf: Bytes) -> Result<ControlFileData, anyhow::Error> {
pub fn decode_pg_control(buf: Bytes) -> Result<ControlFileData, anyhow::Error> {
let mut b: [u8; SIZEOF_CONTROLDATA] = [0u8; SIZEOF_CONTROLDATA];
buf.copy_to_slice(&mut b);
buf.clone().copy_to_slice(&mut b);
let controlfile: ControlFileData;
@@ -65,5 +63,5 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
// Fill the rest of the control file with zeros.
buf.resize(PG_CONTROL_FILE_SIZE as usize, 0);
buf.into()
return buf.into();
}

View File

@@ -30,4 +30,3 @@ crc32c = "0.6.0"
# FIXME: 'pageserver' is needed for ZTimelineId. Refactor
pageserver = { path = "../pageserver" }
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -69,7 +69,7 @@ fn main() -> Result<()> {
let mut conf = WalAcceptorConf {
data_dir: PathBuf::from("./"),
systemid,
systemid: systemid,
daemonize: false,
no_sync: false,
pageserver_addr: None,

View File

@@ -4,6 +4,7 @@ use std::path::PathBuf;
mod pq_protocol;
pub mod wal_service;
pub mod xlog_utils;
use crate::pq_protocol::SystemId;

View File

@@ -91,9 +91,9 @@ impl FeStartupMessage {
options = true;
} else if options {
for opt in p.split(' ') {
if let Some(ztimelineid_str) = opt.strip_prefix("ztimelineid=") {
if opt.starts_with("ztimelineid=") {
// FIXME: rethrow parsing error, don't unwrap
timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap());
timelineid = Some(ZTimelineId::from_str(&opt[12..]).unwrap());
break;
}
}

View File

@@ -29,9 +29,9 @@ use tokio::task;
use tokio_postgres::{connect, Error, NoTls};
use crate::pq_protocol::*;
use crate::xlog_utils::*;
use crate::WalAcceptorConf;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils::*;
type FullTransactionId = u64;
@@ -444,7 +444,7 @@ impl Timeline {
fn get_hs_feedback(&self) -> HotStandbyFeedback {
let shared_state = self.mutex.lock().unwrap();
shared_state.hs_feedback
return shared_state.hs_feedback;
}
// Load and lock control file (prevent running more than one instance of safekeeper)
@@ -527,7 +527,7 @@ impl Timeline {
let file = shared_state.control_file.as_mut().unwrap();
file.seek(SeekFrom::Start(0))?;
file.write_all(&buf[..])?;
file.write_all(&mut buf[..])?;
if sync {
file.sync_all()?;
}
@@ -554,7 +554,7 @@ impl Connection {
async fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8);
self.stream.read_exact(&mut self.inbuf[0..4]).await?;
let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]);
let startup_pkg_len = BigEndian::read_u32(&mut self.inbuf[0..4]);
if startup_pkg_len == 0 {
self.receive_wal().await?; // internal protocol between wal_proposer and wal_acceptor
} else {
@@ -997,12 +997,12 @@ impl Connection {
// Try to fetch replica's feedback
match self.stream.try_read_buf(&mut self.inbuf) {
Ok(0) => break,
Ok(_) => {
if let Some(FeMessage::CopyData(m)) = self.parse_message()? {
self.timeline()
.add_hs_feedback(HotStandbyFeedback::parse(&m.body))
}
}
Ok(_) => match self.parse_message()? {
Some(FeMessage::CopyData(m)) => self
.timeline()
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
_ => {}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => {
return Err(e);
@@ -1102,7 +1102,7 @@ impl Connection {
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
const ZERO_BLOCK: &'static [u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize;

View File

@@ -1,12 +1,3 @@
//
// This file contains common utilities for dealing with PostgreSQL WAL files and
// LSNs.
//
// Many of these functions have been copied from PostgreSQL, and rewritten in
// Rust. That's why they don't follow the usual Rust naming conventions, they
// have been named the same as the corresponding PostgreSQL functions instead.
//
use byteorder::{ByteOrder, LittleEndian};
use crc32c::*;
use log::*;
@@ -32,17 +23,17 @@ pub type XLogSegNo = u64;
#[allow(non_snake_case)]
pub fn XLogSegmentOffset(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> u32 {
(xlogptr as u32) & (wal_segsz_bytes as u32 - 1)
return (xlogptr as u32) & (wal_segsz_bytes as u32 - 1);
}
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
return (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo;
}
#[allow(non_snake_case)]
pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
xlogptr / wal_segsz_bytes as u64
return xlogptr / wal_segsz_bytes as u64;
}
#[allow(non_snake_case)]
@@ -51,7 +42,7 @@ pub fn XLogSegNoOffsetToRecPtr(
offset: u32,
wal_segsz_bytes: usize,
) -> XLogRecPtr {
segno * (wal_segsz_bytes as u64) + (offset as u64)
return segno * (wal_segsz_bytes as u64) + (offset as u64);
}
#[allow(non_snake_case)]
@@ -69,7 +60,7 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
}
#[allow(non_snake_case)]
@@ -79,7 +70,7 @@ pub fn IsXLogFileName(fname: &str) -> bool {
#[allow(non_snake_case)]
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
return fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]);
}
pub fn get_current_timestamp() -> TimestampTz {
@@ -190,12 +181,9 @@ fn find_end_of_wal_segment(
}
}
}
last_valid_rec_pos as u32
return last_valid_rec_pos as u32;
}
///
/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL.
///
pub fn find_end_of_wal(
data_dir: &Path,
wal_seg_size: usize,
@@ -249,7 +237,7 @@ pub fn find_end_of_wal(
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
return (high_ptr, high_tli);
}
(0, 0)
return (0, 0);
}
pub fn main() {

View File

@@ -76,7 +76,7 @@ fn main() -> Result<()> {
// all other commands would need config
let repopath = zenith_repo_dir();
let repopath = PathBuf::from(zenith_repo_dir());
if !repopath.exists() {
bail!(
"Zenith repository does not exists in {}.\n\
@@ -186,7 +186,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let node = cplane
.nodes
.get(name)
.ok_or_else(|| anyhow!("postgres {} is not found", name))?;
.ok_or(anyhow!("postgres {} is not found", name))?;
node.start()?;
}
("stop", Some(sub_m)) => {
@@ -194,7 +194,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let node = cplane
.nodes
.get(name)
.ok_or_else(|| anyhow!("postgres {} is not found", name))?;
.ok_or(anyhow!("postgres {} is not found", name))?;
node.stop()?;
}
@@ -277,19 +277,19 @@ fn list_branches() -> Result<()> {
//
//
fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
let mut strings = s.split('@');
let mut strings = s.split("@");
let name = strings.next().unwrap();
let lsn: Option<u64>;
if let Some(lsnstr) = strings.next() {
let mut s = lsnstr.split('/');
let mut s = lsnstr.split("/");
let lsn_hi: u64 = s
.next()
.ok_or_else(|| anyhow!("invalid LSN in point-in-time specification"))?
.ok_or(anyhow!("invalid LSN in point-in-time specification"))?
.parse()?;
let lsn_lo: u64 = s
.next()
.ok_or_else(|| anyhow!("invalid LSN in point-in-time specification"))?
.ok_or(anyhow!("invalid LSN in point-in-time specification"))?
.parse()?;
lsn = Some(lsn_hi << 32 | lsn_lo);
} else {
@@ -312,8 +312,11 @@ fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(&pointstr)?;
result.lsn = lsn.unwrap_or(0);
if lsn.is_some() {
result.lsn = lsn.unwrap();
} else {
result.lsn = 0;
}
return Ok(result);
}

View File

@@ -5,8 +5,3 @@ authors = ["Eric Seppanen <eric@zenith.tech>"]
edition = "2018"
[dependencies]
tokio = { version = "1.5", features = ["sync", "time" ] }
thiserror = "1"
[dev-dependencies]
tokio = { version = "1.5", features = ["macros", "rt"] }

View File

@@ -1,4 +1,2 @@
//! zenith_utils is intended to be a place to put code that is shared
//! between other crates in this repository.
pub mod seqwait;

View File

@@ -1,199 +0,0 @@
use std::collections::BTreeMap;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch::{channel, Receiver, Sender};
use tokio::time::timeout;
/// An error happened while waiting for a number
#[derive(Debug, PartialEq, thiserror::Error)]
#[error("SeqWaitError")]
pub enum SeqWaitError {
/// The wait timeout was reached
Timeout,
/// [`SeqWait::shutdown`] was called
Shutdown,
}
/// Internal components of a `SeqWait`
struct SeqWaitInt {
waiters: BTreeMap<u64, (Sender<()>, Receiver<()>)>,
current: u64,
shutdown: bool,
}
/// A tool for waiting on a sequence number
///
/// This provides a way to await the arrival of a number.
/// As soon as the number arrives by another caller calling
/// [`advance`], then the waiter will be woken up.
///
/// This implementation takes a blocking Mutex on both [`wait_for`]
/// and [`advance`], meaning there may be unexpected executor blocking
/// due to thread scheduling unfairness. There are probably better
/// implementations, but we can probably live with this for now.
///
/// [`wait_for`]: SeqWait::wait_for
/// [`advance`]: SeqWait::advance
///
pub struct SeqWait {
internal: Mutex<SeqWaitInt>,
}
impl SeqWait {
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: u64) -> Self {
let internal = SeqWaitInt {
waiters: BTreeMap::new(),
current: starting_num,
shutdown: false,
};
SeqWait {
internal: Mutex::new(internal),
}
}
/// Shut down a `SeqWait`, causing all waiters (present and
/// future) to return an error.
pub fn shutdown(&self) {
let waiters = {
// Prevent new waiters; wake all those that exist.
// Wake everyone with an error.
let mut internal = self.internal.lock().unwrap();
// This will steal the entire waiters map.
// When we drop it all waiters will be woken.
mem::take(&mut internal.waiters);
// Drop the lock as we exit this scope.
};
// When we drop the waiters list, each Receiver will
// be woken with an error.
// This drop doesn't need to be explicit; it's done
// here to make it easier to read the code and understand
// the order of events.
drop(waiters);
}
/// Wait for a number to arrive
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub async fn wait_for(&self, num: u64) -> Result<(), SeqWaitError> {
let mut rx = {
let mut internal = self.internal.lock().unwrap();
if internal.current >= num {
return Ok(());
}
if internal.shutdown {
return Err(SeqWaitError::Shutdown);
}
// If we already have a channel for waiting on this number, reuse it.
if let Some((_, rx)) = internal.waiters.get_mut(&num) {
// an Err from changed() means the sender was dropped.
rx.clone()
} else {
// Create a new channel.
let (tx, rx) = channel(());
internal.waiters.insert(num, (tx, rx.clone()));
rx
}
// Drop the lock as we exit this scope.
};
rx.changed().await.map_err(|_| SeqWaitError::Shutdown)
}
/// Wait for a number to arrive
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
pub async fn wait_for_timeout(
&self,
num: u64,
timeout_duration: Duration,
) -> Result<(), SeqWaitError> {
timeout(timeout_duration, self.wait_for(num))
.await
.unwrap_or(Err(SeqWaitError::Timeout))
}
/// Announce a new number has arrived
///
/// All waiters at this value or below will be woken.
///
/// `advance` will panic if you send it a lower number than
/// a previous call.
pub fn advance(&self, num: u64) {
let wake_these = {
let mut internal = self.internal.lock().unwrap();
if internal.current > num {
panic!(
"tried to advance backwards, from {} to {}",
internal.current, num
);
}
internal.current = num;
// split_off will give me all the high-numbered waiters,
// so split and then swap. Everything at or above (num + 1)
// gets to stay.
let mut split = internal.waiters.split_off(&(num + 1));
std::mem::swap(&mut split, &mut internal.waiters);
split
};
for (_wake_num, (tx, _rx)) in wake_these {
// This can fail if there are no receivers.
// We don't care; discard the error.
let _ = tx.send(());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn seqwait() {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
tokio::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
seq2.advance(100);
seq2.wait_for(999).await.expect_err("no 999");
});
tokio::spawn(async move {
seq3.wait_for(42).await.expect("wait_for 42");
seq3.wait_for(0).await.expect("wait_for 0");
});
sleep(Duration::from_secs(1)).await;
seq.advance(99);
seq.wait_for(100).await.expect("wait_for 100");
seq.shutdown();
}
#[tokio::test]
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
tokio::spawn(async move {
let timeout = Duration::from_millis(1);
let res = seq2.wait_for_timeout(42, timeout).await;
assert_eq!(res, Err(SeqWaitError::Timeout));
});
sleep(Duration::from_secs(1)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
seq.advance(99);
}
}