mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Merge pull request #54 from zenithdb/rocksdb_pageserver
Rocksdb pageserver
This commit is contained in:
2
.github/workflows/testing.yml
vendored
2
.github/workflows/testing.yml
vendored
@@ -4,7 +4,7 @@ on: [push]
|
||||
|
||||
jobs:
|
||||
regression-check:
|
||||
timeout-minutes: 10
|
||||
timeout-minutes: 30
|
||||
name: run regression test suite
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
|
||||
185
Cargo.lock
generated
185
Cargo.lock
generated
@@ -91,19 +91,19 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-io"
|
||||
version = "1.3.1"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9315f8f07556761c3e48fec2e6b276004acf426e6dc068b2c2251854d65ee0fd"
|
||||
checksum = "fcb9af4888a70ad78ecb5efcb0ba95d66a3cf54a88b62ae81559954c7588c7a2"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"fastrand",
|
||||
"futures-lite",
|
||||
"libc",
|
||||
"log",
|
||||
"nb-connect",
|
||||
"once_cell",
|
||||
"parking",
|
||||
"polling",
|
||||
"socket2",
|
||||
"vec-arena",
|
||||
"waker-fn",
|
||||
"winapi",
|
||||
@@ -111,9 +111,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-lock"
|
||||
version = "2.3.0"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1996609732bde4a9988bc42125f55f2af5f3c36370e27c778d5191a4a1b63bfb"
|
||||
checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
]
|
||||
@@ -162,9 +162,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.49"
|
||||
version = "0.1.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "589652ce7ccb335d1e7ecb3be145425702b290dbcb7029bbeaae263fc1d87b48"
|
||||
checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -243,13 +243,12 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.53.3"
|
||||
version = "0.57.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c72a978d268b1d70b0e963217e60fdabd9523a941457a6c42a7315d15c7e89e5"
|
||||
checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cexpr",
|
||||
"cfg-if 0.1.10",
|
||||
"clang-sys",
|
||||
"clap",
|
||||
"env_logger",
|
||||
@@ -346,6 +345,9 @@ name = "cc"
|
||||
version = "1.0.67"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
@@ -383,9 +385,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clang-sys"
|
||||
version = "0.29.3"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a"
|
||||
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"libc",
|
||||
@@ -596,9 +598,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.7.1"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
|
||||
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"humantime",
|
||||
@@ -922,9 +924,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "httparse"
|
||||
version = "1.3.6"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc35c995b9d93ec174cf9a27d425c7892722101e14993cd227fdb51d70cf9589"
|
||||
checksum = "4a1ce40d6fc9764887c2fdc7305c3dcc429ba11ff981c1509416afd5697e4437"
|
||||
|
||||
[[package]]
|
||||
name = "httpdate"
|
||||
@@ -934,12 +936,9 @@ checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "1.3.0"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
|
||||
dependencies = [
|
||||
"quick-error",
|
||||
]
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
@@ -980,9 +979,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "idna"
|
||||
version = "0.2.2"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21"
|
||||
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
|
||||
dependencies = [
|
||||
"matches",
|
||||
"unicode-bidi",
|
||||
@@ -1033,6 +1032,15 @@ version = "0.4.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.50"
|
||||
@@ -1071,14 +1079,26 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
version = "0.5.2"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
|
||||
checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"cfg-if 1.0.0",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "librocksdb-sys"
|
||||
version = "6.17.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"cc",
|
||||
"glob",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lock_api"
|
||||
version = "0.4.3"
|
||||
@@ -1184,16 +1204,6 @@ dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nb-connect"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"socket2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "5.1.2"
|
||||
@@ -1213,6 +1223,41 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
|
||||
dependencies = [
|
||||
"num-bigint",
|
||||
"num-complex",
|
||||
"num-integer",
|
||||
"num-iter",
|
||||
"num-rational",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-complex"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.44"
|
||||
@@ -1223,6 +1268,29 @@ dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-iter"
|
||||
version = "0.1.42"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-rational"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.14"
|
||||
@@ -1319,12 +1387,14 @@ dependencies = [
|
||||
"hex",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"parse_duration",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
"postgres_ffi",
|
||||
"rand 0.8.3",
|
||||
"regex",
|
||||
"rocksdb",
|
||||
"rust-s3",
|
||||
"slog",
|
||||
"slog-async",
|
||||
@@ -1373,6 +1443,17 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parse_duration"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"num",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "peeking_take_while"
|
||||
version = "0.1.2"
|
||||
@@ -1405,18 +1486,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc174859768806e91ae575187ada95c91a29e96a98dc5d2cd9a1fed039501ba6"
|
||||
checksum = "c7509cc106041c40a4518d2af7a61530e1eed0e6285296a3d8c5472806ccc4a4"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.0.6"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a490329918e856ed1b083f244e3bfe2d8c4f336407e4ea9e1a9f479ff09049e5"
|
||||
checksum = "48c950132583b500556b1efd71d45b319029f2b71518d979fcc208e16b42426f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1536,12 +1617,6 @@ dependencies = [
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-error"
|
||||
version = "1.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.9"
|
||||
@@ -1738,6 +1813,16 @@ dependencies = [
|
||||
"winreg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rocksdb"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"librocksdb-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rust-argon2"
|
||||
version = "0.8.3"
|
||||
@@ -1975,9 +2060,9 @@ checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27"
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.2"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
|
||||
checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527"
|
||||
|
||||
[[package]]
|
||||
name = "slog"
|
||||
@@ -2415,9 +2500,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.11"
|
||||
version = "0.2.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
|
||||
checksum = "cbdbff6266a24120518560b5dc983096efb98462e51d0d68169895b237be3e5d"
|
||||
|
||||
[[package]]
|
||||
name = "vec-arena"
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::io::{Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpStream;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -16,7 +17,7 @@ use postgres::{Client, NoTls};
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::storage::{PageServerNode, WalProposerNode};
|
||||
use pageserver::ZTimelineId;
|
||||
use pageserver::{zenith_repo_dir, ZTimelineId};
|
||||
|
||||
//
|
||||
// ComputeControlPlane
|
||||
@@ -276,7 +277,9 @@ impl PostgresNode {
|
||||
max_replication_slots = 10\n\
|
||||
hot_standby = on\n\
|
||||
shared_buffers = 1MB\n\
|
||||
fsync = off\n\
|
||||
max_connections = 100\n\
|
||||
wal_sender_timeout = 0\n\
|
||||
wal_level = replica\n\
|
||||
listen_addresses = '{address}'\n\
|
||||
port = {port}\n",
|
||||
@@ -443,8 +446,71 @@ impl PostgresNode {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO
|
||||
pub fn pg_bench() {}
|
||||
pub fn pg_regress(&self) {
|
||||
self.safe_psql("postgres", "CREATE DATABASE regression");
|
||||
let data_dir = zenith_repo_dir();
|
||||
let regress_run_path = data_dir.join("regress");
|
||||
fs::create_dir_all(®ress_run_path).unwrap();
|
||||
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
|
||||
std::env::set_current_dir(regress_run_path).unwrap();
|
||||
|
||||
let regress_build_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
|
||||
let regress_src_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
|
||||
|
||||
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
|
||||
.args(&[
|
||||
"--bindir=''",
|
||||
"--use-existing",
|
||||
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
|
||||
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
|
||||
format!(
|
||||
"--schedule={}",
|
||||
regress_src_path.join("parallel_schedule").to_str().unwrap()
|
||||
)
|
||||
.as_str(),
|
||||
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGPORT", self.address.port().to_string())
|
||||
.env("PGUSER", self.whoami())
|
||||
.env("PGHOST", self.address.ip().to_string())
|
||||
.status()
|
||||
.expect("pg_regress failed");
|
||||
}
|
||||
|
||||
pub fn pg_bench(&self, clients: u32, seconds: u32) {
|
||||
let port = self.address.port().to_string();
|
||||
let clients = clients.to_string();
|
||||
let seconds = seconds.to_string();
|
||||
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&["-i", "-p", port.as_str(), "postgres"])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench -i");
|
||||
let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&[
|
||||
"-p",
|
||||
port.as_str(),
|
||||
"-T",
|
||||
seconds.as_str(),
|
||||
"-P",
|
||||
"1",
|
||||
"-c",
|
||||
clients.as_str(),
|
||||
"-M",
|
||||
"prepared",
|
||||
"postgres",
|
||||
])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench run");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PostgresNode {
|
||||
|
||||
@@ -15,6 +15,7 @@ use std::process::{Command, Stdio};
|
||||
use anyhow::Result;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
|
||||
use pageserver::zenith_repo_dir;
|
||||
use pageserver::ZTimelineId;
|
||||
use postgres_ffi::xlog_utils;
|
||||
|
||||
@@ -52,14 +53,6 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
fn zenith_repo_dir() -> PathBuf {
|
||||
// Find repository path
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => PathBuf::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Initialize a new Zenith repository
|
||||
//
|
||||
@@ -145,7 +138,10 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
|
||||
.arg("--no-instructions")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
|
||||
.env(
|
||||
"DYLD_LIBRARY_PATH",
|
||||
local_env.pg_lib_dir().to_str().unwrap(),
|
||||
)
|
||||
.stdout(Stdio::null())
|
||||
.status()
|
||||
.with_context(|| "failed to execute initdb")?;
|
||||
|
||||
@@ -13,7 +13,6 @@ use std::time::Duration;
|
||||
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
use crate::compute::PostgresNode;
|
||||
use crate::local_env::LocalEnv;
|
||||
use pageserver::ZTimelineId;
|
||||
|
||||
@@ -111,6 +110,9 @@ impl TestStorageControlPlane {
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
for wa in self.wal_acceptors.iter() {
|
||||
let _ = wa.stop();
|
||||
}
|
||||
self.test_done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
@@ -366,43 +368,6 @@ impl Drop for WalProposerNode {
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub fn regress_check(pg: &PostgresNode) {
|
||||
pg.safe_psql("postgres", "CREATE DATABASE regression");
|
||||
|
||||
let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress");
|
||||
fs::create_dir_all(regress_run_path.clone()).unwrap();
|
||||
std::env::set_current_dir(regress_run_path).unwrap();
|
||||
|
||||
let regress_build_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
|
||||
let regress_src_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
|
||||
|
||||
let _regress_check = Command::new(regress_build_path.join("pg_regress"))
|
||||
.args(&[
|
||||
"--bindir=''",
|
||||
"--use-existing",
|
||||
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
|
||||
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
|
||||
format!(
|
||||
"--schedule={}",
|
||||
regress_src_path.join("parallel_schedule").to_str().unwrap()
|
||||
)
|
||||
.as_str(),
|
||||
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGHOST", pg.address.ip().to_string())
|
||||
.env("PGPORT", pg.address.port().to_string())
|
||||
.env("PGUSER", pg.whoami())
|
||||
.status()
|
||||
.expect("pg_regress failed");
|
||||
}
|
||||
|
||||
/// Read a PID file
|
||||
///
|
||||
/// This should contain an unsigned integer, but we return it as a String
|
||||
|
||||
@@ -51,7 +51,6 @@ fn test_redo_cases() {
|
||||
|
||||
// Runs pg_regress on a compute node
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_regress() {
|
||||
let local_env = local_env::test_env("test_regress");
|
||||
|
||||
@@ -64,7 +63,24 @@ fn test_regress() {
|
||||
let node = compute_cplane.new_test_node(maintli);
|
||||
node.start().unwrap();
|
||||
|
||||
control_plane::storage::regress_check(&node);
|
||||
node.pg_regress();
|
||||
}
|
||||
|
||||
// Runs pg_bench on a compute node
|
||||
#[test]
|
||||
fn pgbench() {
|
||||
let local_env = local_env::test_env("pgbench");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_node(maintli);
|
||||
node.start().unwrap();
|
||||
|
||||
node.pg_bench(10, 100);
|
||||
}
|
||||
|
||||
// Run two postgres instances on one pageserver, on different timelines
|
||||
|
||||
@@ -32,12 +32,14 @@ tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a
|
||||
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
|
||||
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" }
|
||||
rocksdb = "0.16.0"
|
||||
anyhow = "1.0"
|
||||
crc32c = "0.6.0"
|
||||
walkdir = "2"
|
||||
thiserror = "1.0"
|
||||
hex = "0.4.3"
|
||||
tar = "0.4.33"
|
||||
parse_duration = "*"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
//
|
||||
|
||||
use log::*;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use parse_duration::parse;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{App, Arg};
|
||||
@@ -16,18 +17,10 @@ use daemonize::Daemonize;
|
||||
|
||||
use slog::Drain;
|
||||
|
||||
use pageserver::page_service;
|
||||
use pageserver::tui;
|
||||
//use pageserver::walreceiver;
|
||||
use pageserver::PageServerConf;
|
||||
use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf};
|
||||
|
||||
fn zenith_repo_dir() -> String {
|
||||
// Find repository path
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => String::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
const DEFAULT_GC_HORIZON: u64 = 0; //64 * 1024 * 1024;
|
||||
const DEFAULT_GC_PERIOD_SEC: u64 = 1;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let arg_matches = App::new("Zenith page server")
|
||||
@@ -53,11 +46,25 @@ fn main() -> Result<()> {
|
||||
.takes_value(false)
|
||||
.help("Run in the background"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("gc_horizon")
|
||||
.long("gc_horizon")
|
||||
.takes_value(true)
|
||||
.help("Distance from current LSN to perform all wal records cleanup"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("gc_period")
|
||||
.long("gc_period")
|
||||
.takes_value(true)
|
||||
.help("Interval between garbage collector iterations"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
daemonize: false,
|
||||
interactive: false,
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
|
||||
listen_addr: "127.0.0.1:5430".parse().unwrap(),
|
||||
};
|
||||
|
||||
@@ -78,6 +85,14 @@ fn main() -> Result<()> {
|
||||
conf.listen_addr = addr.parse()?;
|
||||
}
|
||||
|
||||
if let Some(horizon) = arg_matches.value_of("gc_horizon") {
|
||||
conf.gc_horizon = horizon.parse()?;
|
||||
}
|
||||
|
||||
if let Some(period) = arg_matches.value_of("gc_period") {
|
||||
conf.gc_period = parse(period)?;
|
||||
}
|
||||
|
||||
start_pageserver(&conf)
|
||||
}
|
||||
|
||||
@@ -139,7 +154,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
// does this for us.
|
||||
let repodir = zenith_repo_dir();
|
||||
std::env::set_current_dir(&repodir)?;
|
||||
info!("Changed current directory to repository in {}", &repodir);
|
||||
info!("Changed current directory to repository in {:?}", &repodir);
|
||||
}
|
||||
|
||||
let mut threads = Vec::new();
|
||||
@@ -185,12 +200,16 @@ fn init_logging(conf: &PageServerConf) -> Result<slog_scope::GlobalLoggerGuard,
|
||||
if conf.interactive {
|
||||
Ok(tui::init_logging())
|
||||
} else if conf.daemonize {
|
||||
let log = zenith_repo_dir() + "/pageserver.log";
|
||||
let log_file = File::create(&log).map_err(|err| {
|
||||
// We failed to initialize logging, so we can't log this message with error!
|
||||
eprintln!("Could not create log file {:?}: {}", log, err);
|
||||
err
|
||||
})?;
|
||||
let log = zenith_repo_dir().join("pageserver.log");
|
||||
let log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&log)
|
||||
.map_err(|err| {
|
||||
// We failed to initialize logging, so we can't log this message with error!
|
||||
eprintln!("Could not create log file {:?}: {}", log, err);
|
||||
err
|
||||
})?;
|
||||
let decorator = slog_term::PlainSyncDecorator::new(log_file);
|
||||
let drain = slog_term::CompactFormat::new(decorator).build();
|
||||
let drain = slog::Filter::new(drain, |record: &slog::Record| {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
pub mod basebackup;
|
||||
pub mod page_cache;
|
||||
@@ -19,6 +21,8 @@ pub struct PageServerConf {
|
||||
pub daemonize: bool,
|
||||
pub interactive: bool,
|
||||
pub listen_addr: SocketAddr,
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
}
|
||||
|
||||
/// Zenith Timeline ID is a 128-bit random ID.
|
||||
@@ -81,3 +85,11 @@ impl fmt::Display for ZTimelineId {
|
||||
f.write_str(&hex::encode(self.0))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn zenith_repo_dir() -> PathBuf {
|
||||
// Find repository path
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => PathBuf::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,16 +8,16 @@
|
||||
|
||||
use crate::restore_local_repo::restore_timeline;
|
||||
use crate::ZTimelineId;
|
||||
use crate::{walredo, PageServerConf};
|
||||
use crate::{walredo, zenith_repo_dir, PageServerConf};
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use core::ops::Bound::Included;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use crossbeam_channel::unbounded;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rand::Rng;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use rocksdb;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
@@ -32,6 +32,9 @@ static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
pub struct PageCache {
|
||||
shared: Mutex<PageCacheShared>,
|
||||
|
||||
// RocksDB handle
|
||||
db: rocksdb::DB,
|
||||
|
||||
// Channel for communicating with the WAL redo process here.
|
||||
pub walredo_sender: Sender<Arc<CacheEntry>>,
|
||||
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
|
||||
@@ -82,16 +85,6 @@ impl AddAssign for PageCacheStats {
|
||||
// Shared data structure, holding page cache and related auxiliary information
|
||||
//
|
||||
struct PageCacheShared {
|
||||
// The actual page cache
|
||||
pagecache: BTreeMap<CacheKey, Arc<CacheEntry>>,
|
||||
|
||||
// Relation n_blocks cache
|
||||
//
|
||||
// This hashtable should be updated together with the pagecache. Now it is
|
||||
// accessed unreasonably often through the smgr_nblocks(). It is better to just
|
||||
// cache it in postgres smgr and ask only on restart.
|
||||
relsize_cache: HashMap<RelTag, u32>,
|
||||
|
||||
// What page versions do we hold in the cache? If we get GetPage with
|
||||
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
|
||||
// page version. If we get a request > last_valid_lsn, we need to wait until
|
||||
@@ -135,7 +128,7 @@ pub fn get_or_restore_pagecache(
|
||||
match pcaches.get(&timelineid) {
|
||||
Some(pcache) => Ok(pcache.clone()),
|
||||
None => {
|
||||
let pcache = init_page_cache();
|
||||
let pcache = init_page_cache(conf, timelineid);
|
||||
|
||||
restore_timeline(conf, &pcache, timelineid)?;
|
||||
|
||||
@@ -155,20 +148,42 @@ pub fn get_or_restore_pagecache(
|
||||
walredo::wal_redo_main(&conf_copy, timelineid);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
if conf.gc_horizon != 0 {
|
||||
let conf_copy = conf.clone();
|
||||
let _gc_thread = thread::Builder::new()
|
||||
.name("Garbage collection thread".into())
|
||||
.spawn(move || {
|
||||
gc_thread_main(&conf_copy, timelineid);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn init_page_cache() -> PageCache {
|
||||
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
|
||||
info!("Garbage collection thread started {}", timelineid);
|
||||
let pcache = get_pagecache(conf, timelineid).unwrap();
|
||||
pcache.do_gc(conf).unwrap();
|
||||
}
|
||||
|
||||
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
|
||||
let path = zenith_repo_dir().join(timelineid.to_string());
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_use_fsync(true);
|
||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||
rocksdb::DB::open(&opts, &path).unwrap()
|
||||
}
|
||||
|
||||
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
|
||||
// Initialize the channel between the page cache and the WAL applicator
|
||||
let (s, r) = unbounded();
|
||||
|
||||
PageCache {
|
||||
db: open_rocksdb(&conf, timelineid),
|
||||
shared: Mutex::new(PageCacheShared {
|
||||
pagecache: BTreeMap::new(),
|
||||
relsize_cache: HashMap::new(),
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
@@ -209,6 +224,19 @@ pub struct CacheKey {
|
||||
pub lsn: u64,
|
||||
}
|
||||
|
||||
impl CacheKey {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
self.tag.pack(buf);
|
||||
buf.put_u64(self.lsn);
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> CacheKey {
|
||||
CacheKey {
|
||||
tag: BufferTag::unpack(buf),
|
||||
lsn: buf.get_u64(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CacheEntry {
|
||||
pub key: CacheKey,
|
||||
|
||||
@@ -228,21 +256,47 @@ pub struct CacheEntryContent {
|
||||
pub apply_pending: bool,
|
||||
}
|
||||
|
||||
impl CacheEntry {
|
||||
fn new(key: CacheKey) -> CacheEntry {
|
||||
CacheEntry {
|
||||
key,
|
||||
content: Mutex::new(CacheEntryContent {
|
||||
page_image: None,
|
||||
impl CacheEntryContent {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
if let Some(image) = &self.page_image {
|
||||
buf.put_u8(1);
|
||||
buf.put_u16(image.len() as u16);
|
||||
buf.put_slice(&image[..]);
|
||||
} else if let Some(rec) = &self.wal_record {
|
||||
buf.put_u8(0);
|
||||
rec.pack(buf);
|
||||
}
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
|
||||
if buf.get_u8() == 1 {
|
||||
let mut dst = vec![0u8; buf.get_u16() as usize];
|
||||
buf.copy_to_slice(&mut dst);
|
||||
CacheEntryContent {
|
||||
page_image: Some(Bytes::from(dst)),
|
||||
wal_record: None,
|
||||
apply_pending: false,
|
||||
}),
|
||||
}
|
||||
} else {
|
||||
CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(WALRecord::unpack(buf)),
|
||||
apply_pending: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CacheEntry {
|
||||
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
|
||||
CacheEntry {
|
||||
key,
|
||||
content: Mutex::new(content),
|
||||
walredo_condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
|
||||
pub struct RelTag {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
@@ -250,42 +304,195 @@ pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||
impl RelTag {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
buf.put_u32(self.spcnode);
|
||||
buf.put_u32(self.dbnode);
|
||||
buf.put_u32(self.relnode);
|
||||
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> RelTag {
|
||||
RelTag {
|
||||
spcnode: buf.get_u32(),
|
||||
dbnode: buf.get_u32(),
|
||||
relnode: buf.get_u32(),
|
||||
forknum: buf.get_u32() as u8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct BufferTag {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
pub forknum: u8,
|
||||
pub rel: RelTag,
|
||||
pub blknum: u32,
|
||||
}
|
||||
|
||||
impl BufferTag {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
self.rel.pack(buf);
|
||||
buf.put_u32(self.blknum);
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> BufferTag {
|
||||
BufferTag {
|
||||
rel: RelTag::unpack(buf),
|
||||
blknum: buf.get_u32(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WALRecord {
|
||||
pub lsn: u64, // LSN at the *end* of the record
|
||||
pub will_init: bool,
|
||||
pub truncate: bool,
|
||||
pub rec: Bytes,
|
||||
// Remember the offset of main_data in rec,
|
||||
// so that we don't have to parse the record again.
|
||||
// If record has no main_data, this offset equals rec.len().
|
||||
pub main_data_offset: usize,
|
||||
pub main_data_offset: u32,
|
||||
}
|
||||
|
||||
impl WALRecord {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
buf.put_u64(self.lsn);
|
||||
buf.put_u8(self.will_init as u8);
|
||||
buf.put_u8(self.truncate as u8);
|
||||
buf.put_u32(self.main_data_offset);
|
||||
buf.put_u32(self.rec.len() as u32);
|
||||
buf.put_slice(&self.rec[..]);
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
|
||||
let lsn = buf.get_u64();
|
||||
let will_init = buf.get_u8() != 0;
|
||||
let truncate = buf.get_u8() != 0;
|
||||
let main_data_offset = buf.get_u32();
|
||||
let mut dst = vec![0u8; buf.get_u32() as usize];
|
||||
buf.copy_to_slice(&mut dst);
|
||||
WALRecord {
|
||||
lsn,
|
||||
will_init,
|
||||
truncate,
|
||||
rec: Bytes::from(dst),
|
||||
main_data_offset,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Public interface functions
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// GetPage@LSN
|
||||
//
|
||||
// Returns an 8k page image
|
||||
//
|
||||
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
|
||||
let mut minbuf = BytesMut::new();
|
||||
let mut maxbuf = BytesMut::new();
|
||||
let cf = self
|
||||
.db
|
||||
.cf_handle(rocksdb::DEFAULT_COLUMN_FAMILY_NAME)
|
||||
.unwrap();
|
||||
loop {
|
||||
thread::sleep(conf.gc_period);
|
||||
let last_lsn = self.get_last_valid_lsn();
|
||||
if last_lsn > conf.gc_horizon {
|
||||
let horizon = last_lsn - conf.gc_horizon;
|
||||
let mut maxkey = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: u32::MAX,
|
||||
dbnode: u32::MAX,
|
||||
relnode: u32::MAX,
|
||||
forknum: u8::MAX,
|
||||
},
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn: u64::MAX,
|
||||
};
|
||||
loop {
|
||||
maxbuf.clear();
|
||||
maxkey.pack(&mut maxbuf);
|
||||
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
|
||||
&maxbuf[..],
|
||||
rocksdb::Direction::Reverse,
|
||||
));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut minbuf);
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&k);
|
||||
let key = CacheKey::unpack(&mut minbuf);
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let minkey = CacheKey { tag, lsn: 0 };
|
||||
let maxkey = CacheKey { tag, lsn };
|
||||
// Construct boundaries for old records cleanup
|
||||
maxkey.tag = key.tag;
|
||||
let last_lsn = key.lsn;
|
||||
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
||||
|
||||
let mut minkey = maxkey.clone();
|
||||
minkey.lsn = 0;
|
||||
|
||||
// reconstruct most recent page version
|
||||
if content.wal_record.is_some() {
|
||||
// force reconstruction of most recent page version
|
||||
self.reconstruct_page(key, content)?;
|
||||
}
|
||||
|
||||
maxbuf.clear();
|
||||
maxkey.pack(&mut maxbuf);
|
||||
|
||||
if last_lsn > horizon {
|
||||
// locate most recent record before horizon
|
||||
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
|
||||
&maxbuf[..],
|
||||
rocksdb::Direction::Reverse,
|
||||
));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut minbuf);
|
||||
if content.wal_record.is_some() {
|
||||
minbuf.clear();
|
||||
minbuf.extend_from_slice(&k);
|
||||
let key = CacheKey::unpack(&mut minbuf);
|
||||
self.reconstruct_page(key, content)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
// remove records prior to horizon
|
||||
minbuf.clear();
|
||||
minkey.pack(&mut minbuf);
|
||||
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
|
||||
|
||||
maxkey = minkey;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
|
||||
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));
|
||||
|
||||
let mut entry_content = entry_rc.content.lock().unwrap();
|
||||
entry_content.apply_pending = true;
|
||||
|
||||
let s = &self.walredo_sender;
|
||||
s.send(entry_rc.clone())?;
|
||||
|
||||
while entry_content.apply_pending {
|
||||
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
|
||||
}
|
||||
// We should now have a page image. If we don't, it means that WAL redo
|
||||
// failed to reconstruct it. WAL redo should've logged that error already.
|
||||
let page_img = match &entry_content.page_image {
|
||||
Some(p) => p.clone(),
|
||||
None => {
|
||||
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
|
||||
bail!("could not apply WAL to reconstruct page image");
|
||||
}
|
||||
};
|
||||
self.put_page_image(key.tag, key.lsn, page_img.clone());
|
||||
Ok(page_img)
|
||||
}
|
||||
|
||||
async fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
|
||||
let walreceiver_works = self.walreceiver_works.load(Ordering::Acquire);
|
||||
if walreceiver_works {
|
||||
self.seqwait_lsn
|
||||
@@ -310,81 +517,63 @@ impl PageCache {
|
||||
);
|
||||
}
|
||||
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let shared = self.shared.lock().unwrap();
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
if walreceiver_works {
|
||||
assert!(lsn <= shared.last_valid_lsn);
|
||||
}
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
bail!(
|
||||
"LSN {:X}/{:X} has already been removed",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
|
||||
let pagecache = &shared.pagecache;
|
||||
|
||||
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
|
||||
|
||||
let entry_opt = entries.next_back();
|
||||
|
||||
if entry_opt.is_none() {
|
||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||
/* return Err("could not find page image")?; */
|
||||
}
|
||||
let (_key, entry) = entry_opt.unwrap();
|
||||
entry_rc = entry.clone();
|
||||
|
||||
// Now that we have a reference to the cache entry, drop the lock on the map.
|
||||
// It's important to do this before waiting on the condition variable below,
|
||||
// and better to do it as soon as possible to maximize concurrency.
|
||||
if walreceiver_works {
|
||||
assert!(lsn <= shared.last_valid_lsn);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Lock the cache entry and dig the page image out of it.
|
||||
//
|
||||
// GetPage@LSN
|
||||
//
|
||||
// Returns an 8k page image
|
||||
//
|
||||
pub async fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
self.wait_lsn(lsn).await?;
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let minkey = CacheKey { tag, lsn: 0 };
|
||||
let maxkey = CacheKey { tag, lsn };
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
minkey.pack(&mut buf);
|
||||
|
||||
let mut readopts = rocksdb::ReadOptions::default();
|
||||
readopts.set_iterate_lower_bound(buf.to_vec());
|
||||
|
||||
buf.clear();
|
||||
maxkey.pack(&mut buf);
|
||||
let mut iter = self.db.iterator_opt(
|
||||
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
|
||||
readopts,
|
||||
);
|
||||
let entry_opt = iter.next();
|
||||
|
||||
if entry_opt.is_none() {
|
||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||
/* return Err("could not find page image")?; */
|
||||
}
|
||||
let (k, v) = entry_opt.unwrap();
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
let page_img: Bytes;
|
||||
{
|
||||
let mut entry_content = entry_rc.content.lock().unwrap();
|
||||
|
||||
if let Some(img) = &entry_content.page_image {
|
||||
assert!(!entry_content.apply_pending);
|
||||
page_img = img.clone();
|
||||
} else if entry_content.wal_record.is_some() {
|
||||
//
|
||||
// If this page needs to be reconstructed by applying some WAL,
|
||||
// send a request to the WAL redo thread.
|
||||
//
|
||||
if !entry_content.apply_pending {
|
||||
assert!(!entry_content.apply_pending);
|
||||
entry_content.apply_pending = true;
|
||||
|
||||
let s = &self.walredo_sender;
|
||||
s.send(entry_rc.clone())?;
|
||||
}
|
||||
|
||||
while entry_content.apply_pending {
|
||||
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
|
||||
}
|
||||
|
||||
// We should now have a page image. If we don't, it means that WAL redo
|
||||
// failed to reconstruct it. WAL redo should've logged that error already.
|
||||
page_img = match &entry_content.page_image {
|
||||
Some(p) => p.clone(),
|
||||
None => {
|
||||
error!(
|
||||
"could not apply WAL to reconstruct page image for GetPage@LSN request"
|
||||
);
|
||||
bail!("could not apply WAL to reconstruct page image");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
bail!("no page image or WAL record for requested page");
|
||||
}
|
||||
if let Some(img) = &content.page_image {
|
||||
page_img = img.clone();
|
||||
} else if content.wal_record.is_some() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let key = CacheKey::unpack(&mut buf);
|
||||
page_img = self.reconstruct_page(key, content)?;
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
bail!("no page image or WAL record for requested page");
|
||||
}
|
||||
|
||||
// FIXME: assumes little-endian. Only used for the debugging log though
|
||||
@@ -394,10 +583,10 @@ impl PageCache {
|
||||
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
|
||||
page_lsn_hi,
|
||||
page_lsn_lo,
|
||||
tag.spcnode,
|
||||
tag.dbnode,
|
||||
tag.relnode,
|
||||
tag.forknum,
|
||||
tag.rel.spcnode,
|
||||
tag.rel.dbnode,
|
||||
tag.rel.relnode,
|
||||
tag.rel.forknum,
|
||||
tag.blknum
|
||||
);
|
||||
|
||||
@@ -412,38 +601,42 @@ impl PageCache {
|
||||
// over it.
|
||||
//
|
||||
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let shared = self.shared.lock().unwrap();
|
||||
let pagecache = &shared.pagecache;
|
||||
|
||||
let minkey = CacheKey {
|
||||
tag: entry.key.tag,
|
||||
tag: BufferTag {
|
||||
rel: entry.key.tag.rel,
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: 0,
|
||||
};
|
||||
let maxkey = CacheKey {
|
||||
tag: entry.key.tag,
|
||||
lsn: entry.key.lsn,
|
||||
};
|
||||
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
|
||||
|
||||
// the last entry in the range should be the CacheEntry we were given
|
||||
//let _last_entry = entries.next_back();
|
||||
//assert!(last_entry == entry);
|
||||
let mut buf = BytesMut::new();
|
||||
minkey.pack(&mut buf);
|
||||
|
||||
let mut readopts = rocksdb::ReadOptions::default();
|
||||
readopts.set_iterate_lower_bound(buf.to_vec());
|
||||
|
||||
buf.clear();
|
||||
entry.key.pack(&mut buf);
|
||||
let iter = self.db.iterator_opt(
|
||||
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
|
||||
readopts,
|
||||
);
|
||||
|
||||
let mut base_img: Option<Bytes> = None;
|
||||
let mut records: Vec<WALRecord> = Vec::new();
|
||||
|
||||
// Scan backwards, collecting the WAL records, until we hit an
|
||||
// old page image.
|
||||
for (_key, e) in entries.rev() {
|
||||
let e = e.content.lock().unwrap();
|
||||
|
||||
if let Some(img) = &e.page_image {
|
||||
for (_k, v) in iter {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
if let Some(img) = &content.page_image {
|
||||
// We have a base image. No need to dig deeper into the list of
|
||||
// records
|
||||
base_img = Some(img.clone());
|
||||
break;
|
||||
} else if let Some(rec) = &e.wal_record {
|
||||
} else if let Some(rec) = &content.wal_record {
|
||||
records.push(rec.clone());
|
||||
|
||||
// If this WAL record initializes the page, no need to dig deeper.
|
||||
@@ -466,57 +659,76 @@ impl PageCache {
|
||||
let lsn = rec.lsn;
|
||||
let key = CacheKey { tag, lsn };
|
||||
|
||||
let entry = CacheEntry::new(key.clone());
|
||||
entry.content.lock().unwrap().wal_record = Some(rec);
|
||||
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
|
||||
let rel_tag = RelTag {
|
||||
spcnode: tag.spcnode,
|
||||
dbnode: tag.dbnode,
|
||||
relnode: tag.relnode,
|
||||
forknum: tag.forknum,
|
||||
let content = CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(rec),
|
||||
apply_pending: false,
|
||||
};
|
||||
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
|
||||
if tag.blknum >= *rel_entry {
|
||||
*rel_entry = tag.blknum + 1;
|
||||
}
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
|
||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||
//trace!("put_wal_record lsn: {}", lsn);
|
||||
|
||||
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
|
||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if oldentry.is_some() {
|
||||
error!(
|
||||
"overwriting WAL record with LSN {:X}/{:X} in page cache",
|
||||
lsn >> 32,
|
||||
lsn & 0xffffffff
|
||||
);
|
||||
}
|
||||
|
||||
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
//
|
||||
// Adds a relation-wide WAL record (like truncate) to the page cache,
|
||||
// associating it with all pages started with specified block number
|
||||
//
|
||||
pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
|
||||
let mut key = CacheKey { tag, lsn: rec.lsn };
|
||||
let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?;
|
||||
let content = CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(rec),
|
||||
apply_pending: false,
|
||||
};
|
||||
// set new relation size
|
||||
trace!("Truncate relation {:?}", tag);
|
||||
let mut key_buf = BytesMut::new();
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
|
||||
for blknum in tag.blknum..old_rel_size {
|
||||
key_buf.clear();
|
||||
key.tag.blknum = blknum;
|
||||
key.pack(&mut key_buf);
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||
}
|
||||
let n = (old_rel_size - tag.blknum) as u64;
|
||||
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
||||
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Memorize a full image of a page version
|
||||
//
|
||||
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
|
||||
let key = CacheKey { tag, lsn };
|
||||
let content = CacheEntryContent {
|
||||
page_image: Some(img),
|
||||
wal_record: None,
|
||||
apply_pending: false,
|
||||
};
|
||||
|
||||
let entry = CacheEntry::new(key.clone());
|
||||
entry.content.lock().unwrap().page_image = Some(img);
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let pagecache = &mut shared.pagecache;
|
||||
|
||||
let oldentry = pagecache.insert(key, Arc::new(entry));
|
||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||
assert!(oldentry.is_none());
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||
|
||||
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
|
||||
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
|
||||
|
||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
@@ -602,83 +814,78 @@ impl PageCache {
|
||||
shared.last_record_lsn
|
||||
}
|
||||
|
||||
//
|
||||
// Simple test function for the WAL redo code:
|
||||
//
|
||||
// 1. Pick a page from the page cache at random.
|
||||
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
|
||||
//
|
||||
//
|
||||
pub async fn _test_get_page_at_lsn(&self) {
|
||||
// for quick testing of the get_page_at_lsn() funcion.
|
||||
//
|
||||
// Get a random page from the page cache. Apply all its WAL, by requesting
|
||||
// that page at the highest lsn.
|
||||
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
|
||||
if lsn != u64::MAX {
|
||||
self.wait_lsn(lsn).await?;
|
||||
}
|
||||
|
||||
let mut tag: Option<BufferTag> = None;
|
||||
let mut key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: *rel,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
{
|
||||
let shared = self.shared.lock().unwrap();
|
||||
let pagecache = &shared.pagecache;
|
||||
|
||||
if pagecache.is_empty() {
|
||||
info!("page cache is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
// Find nth entry in the map, where n is picked at random
|
||||
let n = rand::thread_rng().gen_range(0..pagecache.len());
|
||||
let mut i = 0;
|
||||
for (key, _e) in pagecache.iter() {
|
||||
if i == n {
|
||||
tag = Some(key.tag);
|
||||
break;
|
||||
loop {
|
||||
buf.clear();
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
|
||||
&buf[..],
|
||||
rocksdb::Direction::Reverse,
|
||||
));
|
||||
if let Some((k, v)) = iter.next() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
if tag.rel == *rel {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
if let Some(rec) = &content.wal_record {
|
||||
if rec.truncate {
|
||||
if tag.blknum > 0 {
|
||||
key.tag.blknum = tag.blknum - 1;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
let relsize = tag.blknum + 1;
|
||||
return Ok(relsize);
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
|
||||
match self
|
||||
.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee)
|
||||
.await
|
||||
{
|
||||
Ok(_img) => {
|
||||
// This prints out the whole page image.
|
||||
//println!("{:X?}", img);
|
||||
}
|
||||
Err(error) => {
|
||||
error!("GetPage@LSN failed: {}", error);
|
||||
}
|
||||
break;
|
||||
}
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
/// Remember a relation's size in blocks.
|
||||
///
|
||||
/// If 'to' is larger than the previously remembered size, the remembered size is increased to 'to'.
|
||||
/// But if it's smaller, there is no change.
|
||||
pub fn relsize_inc(&self, rel: &RelTag, to: u32) {
|
||||
// FIXME: Shouldn't relation size also be tracked with an LSN?
|
||||
// If a replica is lagging behind, it needs to get the size as it was on
|
||||
// the replica's current replay LSN.
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
|
||||
pub async fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<bool> {
|
||||
self.wait_lsn(lsn).await?;
|
||||
|
||||
if to >= *entry {
|
||||
*entry = to;
|
||||
let key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: *rel,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
|
||||
&buf[..],
|
||||
rocksdb::Direction::Reverse,
|
||||
));
|
||||
if let Some((k, _v)) = iter.next() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
if tag.rel == *rel {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
|
||||
*entry
|
||||
}
|
||||
|
||||
pub fn relsize_exist(&self, rel: &RelTag) -> bool {
|
||||
let shared = self.shared.lock().unwrap();
|
||||
let relsize_cache = &shared.relsize_cache;
|
||||
relsize_cache.contains_key(rel)
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub fn get_stats(&self) -> PageCacheStats {
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::io;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::runtime;
|
||||
@@ -50,12 +51,8 @@ enum FeMessage {
|
||||
// All that messages are actually CopyData from libpq point of view.
|
||||
//
|
||||
ZenithExistsRequest(ZenithRequest),
|
||||
ZenithTruncRequest(ZenithRequest),
|
||||
ZenithUnlinkRequest(ZenithRequest),
|
||||
ZenithNblocksRequest(ZenithRequest),
|
||||
ZenithReadRequest(ZenithRequest),
|
||||
ZenithCreateRequest(ZenithRequest),
|
||||
ZenithExtendRequest(ZenithRequest),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -383,12 +380,8 @@ impl FeMessage {
|
||||
// serialization.
|
||||
match smgr_tag {
|
||||
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
|
||||
1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))),
|
||||
2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))),
|
||||
3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
|
||||
4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
|
||||
5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))),
|
||||
6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))),
|
||||
1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
|
||||
2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
|
||||
_ => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
|
||||
@@ -431,6 +424,7 @@ pub fn thread_main(conf: &PageServerConf) {
|
||||
loop {
|
||||
let (socket, peer_addr) = listener.accept().await.unwrap();
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
socket.set_nodelay(true).unwrap();
|
||||
let mut conn_handler = Connection::new(conf.clone(), socket, &runtime_ref);
|
||||
|
||||
task::spawn(async move {
|
||||
@@ -625,7 +619,7 @@ impl Connection {
|
||||
let mut unnamed_query_string = Bytes::new();
|
||||
loop {
|
||||
let msg = self.read_message().await?;
|
||||
info!("got message {:?}", msg);
|
||||
trace!("got message {:?}", msg);
|
||||
match msg {
|
||||
Some(FeMessage::StartupMessage(m)) => {
|
||||
trace!("got message {:?}", m);
|
||||
@@ -788,7 +782,7 @@ impl Connection {
|
||||
let message = self.read_message().await?;
|
||||
|
||||
if let Some(m) = &message {
|
||||
info!("query({:?}): {:?}", timelineid, m);
|
||||
trace!("query({:?}): {:?}", timelineid, m);
|
||||
};
|
||||
|
||||
if message.is_none() {
|
||||
@@ -805,7 +799,7 @@ impl Connection {
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
let exist = pcache.relsize_exist(&tag);
|
||||
let exist = pcache.relsize_exist(&tag, req.lsn).await.unwrap_or(false);
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: exist,
|
||||
@@ -813,20 +807,6 @@ impl Connection {
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithTruncRequest(_)) => {
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithUnlinkRequest(_)) => {
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithNblocksRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
@@ -835,7 +815,7 @@ impl Connection {
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
let n_blocks = pcache.relsize_get(&tag);
|
||||
let n_blocks = pcache.relsize_get(&tag, req.lsn).await.unwrap_or(0);
|
||||
|
||||
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
@@ -845,10 +825,12 @@ impl Connection {
|
||||
}
|
||||
Some(FeMessage::ZenithReadRequest(req)) => {
|
||||
let buf_tag = page_cache::BufferTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
rel: page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
},
|
||||
blknum: req.blkno,
|
||||
};
|
||||
|
||||
@@ -871,38 +853,6 @@ impl Connection {
|
||||
|
||||
self.write_message(&msg).await?
|
||||
}
|
||||
Some(FeMessage::ZenithCreateRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
pcache.relsize_inc(&tag, 0);
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithExtendRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
pcache.relsize_inc(&tag, req.blkno + 1);
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -942,10 +892,7 @@ impl Connection {
|
||||
let joinres = f_tar.await;
|
||||
|
||||
if let Err(joinreserr) = joinres {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
joinreserr,
|
||||
));
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidData, joinreserr));
|
||||
}
|
||||
joinres.unwrap()
|
||||
};
|
||||
@@ -982,7 +929,7 @@ impl Connection {
|
||||
// FIXME: I'm getting an error from the tokio copyout driver without this.
|
||||
// I think it happens when the CommandComplete, CloseComplete and ReadyForQuery
|
||||
// are sent in the same TCP packet as the CopyDone. I don't understand why.
|
||||
thread::sleep(std::time::Duration::from_secs(1));
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -44,10 +44,13 @@ pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4;
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||
pub const RM_XLOG_ID: u8 = 0;
|
||||
pub const RM_XACT_ID: u8 = 1;
|
||||
pub const RM_SMGR_ID: u8 = 2;
|
||||
pub const RM_CLOG_ID: u8 = 3;
|
||||
// pub const RM_MULTIXACT_ID:u8 = 6;
|
||||
|
||||
// from xlogreader.h
|
||||
pub const XLR_INFO_MASK: u8 = 0x0F;
|
||||
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
|
||||
|
||||
@@ -29,6 +29,7 @@ use bytes::Bytes;
|
||||
use crate::page_cache;
|
||||
use crate::page_cache::BufferTag;
|
||||
use crate::page_cache::PageCache;
|
||||
use crate::page_cache::RelTag;
|
||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
@@ -202,11 +203,13 @@ fn restore_relfile(
|
||||
let r = file.read_exact(&mut buf);
|
||||
match r {
|
||||
Ok(_) => {
|
||||
let tag = page_cache::BufferTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode,
|
||||
forknum: forknum as u8,
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode: relnode,
|
||||
forknum: forknum as u8,
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
||||
@@ -233,14 +236,6 @@ fn restore_relfile(
|
||||
blknum += 1;
|
||||
}
|
||||
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode,
|
||||
forknum: forknum as u8,
|
||||
};
|
||||
pcache.relsize_inc(&tag, blknum);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -307,18 +302,21 @@ fn restore_wal(
|
||||
// so having multiple copies of it doesn't cost that much)
|
||||
for blk in decoded.blocks.iter() {
|
||||
let tag = BufferTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
rel: RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
},
|
||||
blknum: blk.blkno,
|
||||
};
|
||||
|
||||
let rec = page_cache::WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
truncate: false,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset,
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
|
||||
pcache.put_wal_record(tag, rec);
|
||||
|
||||
@@ -306,10 +306,12 @@ async fn slurp_base_file(
|
||||
|
||||
while bytes.remaining() >= 8192 {
|
||||
let tag = page_cache::BufferTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
rel: page_cache::RelTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
|
||||
|
||||
@@ -328,6 +328,8 @@ impl DecodedBkpBlock {
|
||||
const SizeOfXLogRecord: u32 = 24;
|
||||
|
||||
pub struct DecodedWALRecord {
|
||||
pub xl_info: u8,
|
||||
pub xl_rmid: u8,
|
||||
pub record: Bytes, // raw XLogRecord
|
||||
|
||||
pub blocks: Vec<DecodedBkpBlock>,
|
||||
@@ -353,11 +355,40 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
|
||||
xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub type Oid = u32;
|
||||
pub type BlockNumber = u32;
|
||||
|
||||
pub const MAIN_FORKNUM: u8 = 0;
|
||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RelFileNode {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
pub spcnode: Oid, /* tablespace */
|
||||
pub dbnode: Oid, /* database */
|
||||
pub relnode: Oid, /* relation */
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlSmgrTruncate {
|
||||
pub blkno: BlockNumber,
|
||||
pub rnode: RelFileNode,
|
||||
pub flags: u32,
|
||||
}
|
||||
|
||||
pub fn decode_truncate_record(decoded: &DecodedWALRecord) -> XlSmgrTruncate {
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance((SizeOfXLogRecord + 2) as usize);
|
||||
XlSmgrTruncate {
|
||||
blkno: buf.get_u32_le(),
|
||||
rnode: RelFileNode {
|
||||
spcnode: buf.get_u32_le(), /* tablespace */
|
||||
dbnode: buf.get_u32_le(), /* database */
|
||||
relnode: buf.get_u32_le(), /* relation */
|
||||
},
|
||||
flags: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@@ -379,13 +410,13 @@ pub struct RelFileNode {
|
||||
// block data
|
||||
// ...
|
||||
// main data
|
||||
pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
let mut rnode_spcnode: u32 = 0;
|
||||
let mut rnode_dbnode: u32 = 0;
|
||||
let mut rnode_relnode: u32 = 0;
|
||||
let mut got_rnode = false;
|
||||
|
||||
let mut buf = rec.clone();
|
||||
let mut buf = record.clone();
|
||||
|
||||
// 1. Parse XLogRecord struct
|
||||
|
||||
@@ -649,7 +680,9 @@ pub fn decode_wal_record(rec: Bytes) -> DecodedWALRecord {
|
||||
}
|
||||
|
||||
DecodedWALRecord {
|
||||
record: rec,
|
||||
xl_info,
|
||||
xl_rmid,
|
||||
record,
|
||||
blocks,
|
||||
main_data_offset,
|
||||
}
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
//!
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::page_cache::BufferTag;
|
||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
||||
use crate::page_cache::{BufferTag, RelTag};
|
||||
use crate::pg_constants;
|
||||
use crate::waldecoder::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use anyhow::Error;
|
||||
@@ -223,22 +224,51 @@ async fn walreceiver_main(
|
||||
// so having multiple copies of it doesn't cost that much)
|
||||
for blk in decoded.blocks.iter() {
|
||||
let tag = BufferTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
rel: RelTag {
|
||||
spcnode: blk.rnode_spcnode,
|
||||
dbnode: blk.rnode_dbnode,
|
||||
relnode: blk.rnode_relnode,
|
||||
forknum: blk.forknum as u8,
|
||||
},
|
||||
blknum: blk.blkno,
|
||||
};
|
||||
|
||||
let rec = page_cache::WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
truncate: false,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset,
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
|
||||
pcache.put_wal_record(tag, rec);
|
||||
}
|
||||
// include truncate wal record in all pages
|
||||
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
|
||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||
{
|
||||
let truncate = decode_truncate_record(&decoded);
|
||||
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: truncate.rnode.spcnode,
|
||||
dbnode: truncate.rnode.dbnode,
|
||||
relnode: truncate.rnode.relnode,
|
||||
forknum: MAIN_FORKNUM,
|
||||
},
|
||||
blknum: truncate.blkno,
|
||||
};
|
||||
let rec = page_cache::WALRecord {
|
||||
lsn: lsn,
|
||||
will_init: false,
|
||||
truncate: true,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
pcache.put_rel_wal_record(tag, rec).await?;
|
||||
}
|
||||
}
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
pcache.advance_last_record_lsn(lsn);
|
||||
|
||||
@@ -18,7 +18,10 @@ use log::*;
|
||||
use std::assert;
|
||||
use std::cell::RefCell;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::prelude::*;
|
||||
use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -66,21 +69,28 @@ pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
|
||||
let _guard = runtime.enter();
|
||||
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
|
||||
}
|
||||
info!("WAL redo postgres started");
|
||||
|
||||
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
|
||||
// After that, kill it and start a new one. This is mostly to avoid
|
||||
// using up all shared buffers in Postgres's shared buffer cache; we don't
|
||||
// want to write any pages to disk in the WAL redo process.
|
||||
for _i in 1..100 {
|
||||
for _i in 1..100000 {
|
||||
let request = walredo_channel_receiver.recv().unwrap();
|
||||
|
||||
let result = handle_apply_request(&pcache, &process, &runtime, request);
|
||||
if result.is_err() {
|
||||
// On error, kill the process.
|
||||
// Something went wrong with handling the request. It's not clear
|
||||
// if the request was faulty, and the next request would succeed
|
||||
// again, or if the 'postgres' process went haywire. To be safe,
|
||||
// kill the 'postgres' process so that we will start from a clean
|
||||
// slate, with a new process, for the next request.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Time to kill the 'postgres' process. A new one will be launched on next
|
||||
// iteration of the loop.
|
||||
info!("killing WAL redo postgres process");
|
||||
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
|
||||
let mut child = process.child;
|
||||
@@ -153,6 +163,7 @@ fn handle_apply_request(
|
||||
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
|
||||
|
||||
let mut entry = entry_rc.content.lock().unwrap();
|
||||
assert!(entry.apply_pending);
|
||||
entry.apply_pending = false;
|
||||
|
||||
let nrecords = records.len();
|
||||
@@ -160,7 +171,7 @@ fn handle_apply_request(
|
||||
let start = Instant::now();
|
||||
|
||||
let apply_result: Result<Bytes, Error>;
|
||||
if tag.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
|
||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
|
||||
//TODO use base image if any
|
||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
let zero_page_bytes: &[u8] = &ZERO_PAGE;
|
||||
@@ -215,9 +226,6 @@ fn handle_apply_request(
|
||||
result = Err(e);
|
||||
} else {
|
||||
entry.page_image = Some(apply_result.unwrap());
|
||||
pcache
|
||||
.num_page_images
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
result = Ok(());
|
||||
}
|
||||
|
||||
@@ -258,8 +266,14 @@ impl WalRedoProcess {
|
||||
std::str::from_utf8(&initdb.stdout).unwrap(),
|
||||
std::str::from_utf8(&initdb.stderr).unwrap()
|
||||
);
|
||||
} else {
|
||||
// Limit shared cache for wal-redo-postres
|
||||
let mut config = OpenOptions::new()
|
||||
.append(true)
|
||||
.open(PathBuf::from(&datadir).join("postgresql.conf"))?;
|
||||
config.write(b"shared_buffers=128kB\n")?;
|
||||
config.write(b"fsync=off\n")?;
|
||||
}
|
||||
|
||||
// Start postgres itself
|
||||
let mut child = Command::new("postgres")
|
||||
.arg("--wal-redo")
|
||||
@@ -290,7 +304,7 @@ impl WalRedoProcess {
|
||||
if res.unwrap() == 0 {
|
||||
break;
|
||||
}
|
||||
debug!("wal-redo-postgres: {}", line.trim());
|
||||
error!("wal-redo-postgres: {}", line.trim());
|
||||
line.clear();
|
||||
}
|
||||
Ok::<(), Error>(())
|
||||
@@ -390,11 +404,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
|
||||
buf.put_u8(b'B');
|
||||
buf.put_u32(len as u32);
|
||||
buf.put_u32(tag.spcnode);
|
||||
buf.put_u32(tag.dbnode);
|
||||
buf.put_u32(tag.relnode);
|
||||
buf.put_u32(tag.forknum as u32);
|
||||
buf.put_u32(tag.blknum);
|
||||
tag.pack(&mut buf);
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
@@ -409,11 +419,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
|
||||
buf.put_u8(b'P');
|
||||
buf.put_u32(len as u32);
|
||||
buf.put_u32(tag.spcnode);
|
||||
buf.put_u32(tag.dbnode);
|
||||
buf.put_u32(tag.relnode);
|
||||
buf.put_u32(tag.forknum as u32);
|
||||
buf.put_u32(tag.blknum);
|
||||
tag.pack(&mut buf);
|
||||
buf.put(base_img);
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
@@ -441,11 +447,7 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
|
||||
buf.put_u8(b'G');
|
||||
buf.put_u32(len as u32);
|
||||
buf.put_u32(tag.spcnode);
|
||||
buf.put_u32(tag.dbnode);
|
||||
buf.put_u32(tag.relnode);
|
||||
buf.put_u32(tag.forknum as u32);
|
||||
buf.put_u32(tag.blknum);
|
||||
tag.pack(&mut buf);
|
||||
|
||||
assert!(buf.len() == 1 + len);
|
||||
|
||||
|
||||
@@ -17,4 +17,4 @@ hex = "0.4.3"
|
||||
log = "0.4.14"
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0.53.1"
|
||||
bindgen = "0.57"
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 610033bb35...daec929ec3
@@ -196,5 +196,4 @@ mod tests {
|
||||
// because the waiter already dropped its Receiver.
|
||||
seq.advance(99);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user