diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b39d901be7..bfe38ef528 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::Duration; use std::{collections::BTreeMap, path::PathBuf}; use std::{io::Write, net::SocketAddr}; +use std::path::Path; use lazy_static::lazy_static; use postgres::{Client, NoTls}; @@ -246,7 +247,9 @@ impl PostgresNode { max_replication_slots = 10\n\ hot_standby = on\n\ shared_buffers = 1MB\n\ + fsync = off\n\ max_connections = 100\n\ + wal_sender_timeout = 0\n\ wal_level = replica\n\ listen_addresses = '{address}'\n\ port = {port}\n", @@ -415,8 +418,69 @@ impl PostgresNode { } } - // TODO - pub fn pg_bench() {} + + pub fn pg_regress(&self) { + self.safe_psql("postgres", "CREATE DATABASE regression"); + + let regress_run_path = self.env.data_dir.join("regress"); + fs::create_dir_all(regress_run_path.clone()).unwrap(); + fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); + std::env::set_current_dir(regress_run_path).unwrap(); + + let regress_build_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); + let regress_src_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); + + let _regress_check = Command::new(regress_build_path.join("pg_regress")) + .args(&[ + "--bindir=''", + "--use-existing", + format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(), + format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), + format!( + "--schedule={}", + regress_src_path.join("parallel_schedule").to_str().unwrap() + ) + .as_str(), + format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), + ]) + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGPORT", self.address.port().to_string()) + .env("PGUSER", self.whoami()) + .env("PGHOST", self.address.ip().to_string()) + .status() + .expect("pg_regress failed"); + } + + pub fn pg_bench(&self, clients: u32, seconds: u32) { + let port = self.address.port().to_string(); + let clients = clients.to_string(); + let seconds = seconds.to_string(); + let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&["-i", "-p", port.as_str(), "postgres"]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench -i"); + let _pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&[ + "-p", + port.as_str(), + "-T", + seconds.as_str(), + "-P", + "1", + "-c", + clients.as_str(), + "-M", + "prepared", + "postgres", + ]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench run"); + } } impl Drop for PostgresNode { diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index eba2966849..3175998f9e 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -12,7 +12,6 @@ use std::time::Duration; use postgres::{Client, NoTls}; -use crate::compute::PostgresNode; use crate::local_env::{self, LocalEnv}; type Result = std::result::Result>; @@ -104,6 +103,9 @@ impl TestStorageControlPlane { } pub fn stop(&self) { + for wa in self.wal_acceptors.iter() { + let _unused = wa.stop(); + } self.test_done.store(true, Ordering::Relaxed); } @@ -350,42 +352,6 @@ impl Drop for WalProposerNode { } } -/////////////////////////////////////////////////////////////////////////////// - -pub fn regress_check(pg: &PostgresNode) { - pg.safe_psql("postgres", "CREATE DATABASE regression"); - - let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress"); - fs::create_dir_all(regress_run_path.clone()).unwrap(); - std::env::set_current_dir(regress_run_path).unwrap(); - - let regress_build_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); - let regress_src_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); - - let _regress_check = Command::new(regress_build_path.join("pg_regress")) - .args(&[ - "--bindir=''", - "--use-existing", - format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(), - format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), - format!( - "--schedule={}", - regress_src_path.join("parallel_schedule").to_str().unwrap() - ) - .as_str(), - format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), - ]) - .env_clear() - .env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap()) - .env("PGHOST", pg.address.ip().to_string()) - .env("PGPORT", pg.address.port().to_string()) - .env("PGUSER", pg.whoami()) - .status() - .expect("pg_regress failed"); -} - /// Read a PID file /// /// This should contain an unsigned integer, but we return it as a String diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index a7e389455e..bfb9b71d0f 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -60,7 +60,6 @@ fn test_regress() { let node = compute_cplane.new_test_node(); node.start().unwrap(); -<<<<<<< HEAD node.pg_regress(); } @@ -68,17 +67,14 @@ fn test_regress() { #[test] fn pgbench() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_node(); - node.start(&storage_cplane); + let node = compute_cplane.new_test_node(); + node.start().unwrap(); node.pg_bench(10, 100); -======= - control_plane::storage::regress_check(&node); ->>>>>>> main } // Run two postgres instances on one pageserver diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 17a5f48d18..d5b3481073 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -29,9 +29,10 @@ daemonize = "0.4.1" rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" } anyhow = "1.0" crc32c = "0.6.0" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9e46e3d0a9..2ba51e83a2 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -224,11 +224,11 @@ fn init_logging(conf: &PageServerConf) -> Result CacheEntry { - CacheEntry { - key, - content: Mutex::new(CacheEntryContent { - page_image: None, - wal_record: None, - apply_pending: false, - } - } - } -} - impl CacheEntry { fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { CacheEntry { @@ -404,18 +388,6 @@ impl PageCache { lsn & 0xffff_ffff ); } - - let pagecache = &shared.pagecache; - - let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); - - let entry_opt = entries.next_back(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } } let mut buf = BytesMut::new(); minkey.pack(&mut buf); diff --git a/pageserver/src/restore_datadir.rs b/pageserver/src/restore_datadir.rs index 985f5e3905..3b28d64585 100644 --- a/pageserver/src/restore_datadir.rs +++ b/pageserver/src/restore_datadir.rs @@ -324,10 +324,12 @@ async fn slurp_base_file( while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { - spcnode: parsed.spcnode, - dbnode: parsed.dbnode, - relnode: parsed.relnode, - forknum: parsed.forknum as u8, + rel: page_cache::RelTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + }, blknum: blknum, }; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index c11a00fc78..692d7f466d 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,7 +8,7 @@ use crate::page_cache; use crate::page_cache::{BufferTag, RelTag}; -use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; +use crate::waldecoder::*; use crate::PageServerConf; use anyhow::Error; use log::*;