diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 8157c62a8b..c2b29e7397 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::fs::{self, OpenOptions}; use std::os::unix::fs::PermissionsExt; use std::net::TcpStream; @@ -402,40 +401,18 @@ impl PostgresNode { Client::connect(connstring.as_str(), NoTls).unwrap() } - /* Create stub controlfile and respective xlog to start computenode */ - pub fn setup_controlfile(&self) { - let filepath = format!("{}/global/pg_control", self.pgdata().to_str().unwrap()); - - { - File::create(filepath).unwrap(); - } - - let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); - - let pg_resetwal = Command::new(pg_resetwal_path) - .args(&["-D", self.pgdata().to_str().unwrap()]) - .arg("-f") - // TODO probably we will have to modify pg_resetwal - // .arg("--compute-node") - .status() - .expect("failed to execute pg_resetwal"); - - if !pg_resetwal.success() { - panic!("pg_resetwal failed"); - } - } - - pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode { + pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode { let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); match Command::new(proxy_path.as_path()) - .args(&["-s", &wal_acceptors]) + .args(&["--ztimelineid", &self.timelineid.to_str()]) + .args(&["-s", wal_acceptors]) .args(&["-h", &self.address.ip().to_string()]) .args(&["-p", &self.address.port().to_string()]) .arg("-v") .stderr(OpenOptions::new() + .create(true) .append(true) - .open(self.env.repo_path.join("safepkeeper_proxy.log")) - .unwrap()) + .open(self.pgdata().join("safekeeper_proxy.log")).unwrap()) .spawn() { Ok(child) => WalProposerNode { pid: child.id() }, diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index ebbcba7f26..5ac5cb8fd2 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -7,9 +7,10 @@ use std::env; use std::fs; use std::path::{Path, PathBuf}; -use std::process::Command; +use std::process::{Command, Stdio}; use bytes::Bytes; use rand::Rng; +use anyhow::Context; use hex; use serde_derive::{Deserialize, Serialize}; @@ -29,6 +30,9 @@ pub struct LocalEnv { // Path to the Repository. Here page server and compute nodes will create and store their data. pub repo_path: PathBuf, + // System identifier, from the PostgreSQL control file + pub systemid: u64, + // Path to postgres distribution. It's expected that "bin", "include", // "lib", "share" from postgres distribution are there. If at some point // in time we will be able to run against vanilla postgres we may split that @@ -96,40 +100,32 @@ pub fn init() -> Result<()> { } // ok, we are good to go - let conf = LocalEnv { + let mut conf = LocalEnv { repo_path: repo_path.clone(), pg_distrib_dir, zenith_distrib_dir, + systemid: 0, }; - init_repo(&conf)?; - - // write config - let toml = toml::to_string(&conf)?; - fs::write(repo_path.join("config"), toml)?; + init_repo(&mut conf)?; Ok(()) } -pub fn init_repo(local_env: &LocalEnv) -> Result<()> +pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { let repopath = String::from(local_env.repo_path.to_str().unwrap()); - fs::create_dir(&repopath)?; + fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath))?; fs::create_dir(repopath.clone() + "/pgdatadirs")?; fs::create_dir(repopath.clone() + "/timelines")?; fs::create_dir(repopath.clone() + "/refs")?; fs::create_dir(repopath.clone() + "/refs/branches")?; fs::create_dir(repopath.clone() + "/refs/tags")?; - - // Create empty config file - let configpath = repopath.clone() + "/config"; - fs::write(&configpath, r##" -# Example config file. Nothing here yet. -"##) - .expect(&format!("Unable to write file {}", &configpath)); + println!("created directory structure in {}", repopath); // Create initial timeline let tli = create_timeline(&local_env, None)?; let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli)); + println!("created initial timeline {}", timelinedir); // Run initdb // @@ -139,32 +135,50 @@ pub fn init_repo(local_env: &LocalEnv) -> Result<()> let initdb_path = local_env.pg_bin_dir().join("initdb"); let _initdb = Command::new(initdb_path) - .args(&["-D", "tmp", "--no-instructions"]) + .args(&["-D", "tmp"]) + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) + .stdout(Stdio::null()) .status() - .expect("failed to execute initdb"); + .with_context(|| "failed to execute initdb")?; + println!("initdb succeeded"); - // Read control file to extract the LSN + // Read control file to extract the LSN and system id let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read("tmp/global/pg_control")?))?; - + let systemid = controlfile.system_identifier; let lsn = controlfile.checkPoint; let lsnstr = format!("{:016X}", lsn); // Move the initial WAL file fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.clone() + "/wal/000000010000000000000001.partial")?; + println!("moved initial WAL file"); // Remove pg_wal fs::remove_dir_all("tmp/pg_wal")?; + println!("removed tmp/pg_wal"); force_crash_recovery(&PathBuf::from("tmp"))?; + println!("updated pg_control"); let target = timelinedir.clone() + "/snapshots/" + &lsnstr; - fs::rename("tmp", target)?; + fs::rename("tmp", &target)?; + println!("moved 'tmp' to {}", &target); // Create 'main' branch to refer to the initial timeline let data = hex::encode(tli); fs::write(repopath.clone() + "/refs/branches/main", data)?; + println!("created main branch"); + + // Also update the system id in the LocalEnv + local_env.systemid = systemid; + + // write config + let toml = toml::to_string(&local_env)?; + fs::write(repopath.clone() + "/config", toml)?; println!("new zenith repository was created in {}", &repopath); + Ok(()) } @@ -209,17 +223,20 @@ pub fn load_config(repopath: &Path) -> Result { // local env for tests pub fn test_env(testname: &str) -> LocalEnv { + fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check"); + let repo_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_check/").join(testname); // Remove remnants of old test repo let _ = fs::remove_dir_all(&repo_path); - let local_env = LocalEnv { + let mut local_env = LocalEnv { repo_path, pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), zenith_distrib_dir: cargo_bin_dir(), + systemid: 0, }; - init_repo(&local_env).unwrap(); + init_repo(&mut local_env).expect("could not initialize zenith repository"); return local_env; } diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index dd935cb4fb..f2dbf8dc1a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -176,6 +176,7 @@ impl PageServerNode { cmd .args(&["-l", self.address().to_string().as_str()]) .arg("-d") .env_clear() + .env("RUST_BACKTRACE", "1") .env("ZENITH_REPO_DIR", self.repo_path()) .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); @@ -294,6 +295,12 @@ impl WalAcceptorNode { let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor")) .args(&["-D", self.data_dir.to_str().unwrap()]) .args(&["-l", self.listen.to_string().as_str()]) + .args(&["--systemid", &self.env.systemid.to_string()]) + // Tell page server it can receive WAL from this WAL safekeeper + // FIXME: If there are multiple safekeepers, they will all inform + // the page server. Only the last "notification" will stay in effect. + // So it's pretty random which safekeeper the page server will connect to + .args(&["--pageserver", "127.0.0.1:64000"]) .arg("-d") .arg("-n") .status() diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 316a098afe..04ca933d74 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -2,6 +2,8 @@ use control_plane::compute::ComputeControlPlane; use control_plane::storage::TestStorageControlPlane; use control_plane::local_env; +use control_plane::local_env::PointInTime; +use pageserver::ZTimelineId; use rand::Rng; use std::sync::Arc; @@ -23,7 +25,7 @@ fn test_acceptors_normal_work() { node.start().unwrap(); // start proxy - let _proxy = node.start_proxy(wal_acceptors); + let _proxy = node.start_proxy(&wal_acceptors); // check basic work with table node.safe_psql( @@ -44,23 +46,39 @@ fn test_acceptors_normal_work() { // check wal files equality } +// Run page server and multiple safekeepers, and multiple compute nodes running +// against different timelines. #[test] -fn test_multitenancy() { - // Start pageserver that reads WAL directly from that postgres +fn test_many_timelines() { + // Initialize a new repository, and set up WAL safekeepers and page server. const REDUNDANCY: usize = 3; - const N_NODES: usize = 5; - let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); + const N_TIMELINES: usize = 5; + let local_env = local_env::test_env("test_many_timelines"); + let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgres + // Create branches + let mut timelines: Vec = Vec::new(); + let maintli = storage_cplane.get_branch_timeline("main"); // main branch + timelines.push(maintli); + let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); + for i in 1..N_TIMELINES { // additional branches + let branchname = format!("experimental{}", i); + local_env::create_branch(&local_env, &branchname, + PointInTime { timelineid: maintli, + lsn: startpoint }).unwrap(); + let tli = storage_cplane.get_branch_timeline(&branchname); + timelines.push(tli); + } + + // start postgres on each timeline let mut nodes = Vec::new(); - let mut proxies = Vec::new(); - for _ in 0..N_NODES { - let node = compute_cplane.new_test_master_node(); - nodes.push(node); - nodes.last().unwrap().start().unwrap(); - proxies.push(nodes.last().unwrap().start_proxy(wal_acceptors.clone())); + for tli in timelines { + let node = compute_cplane.new_test_node(tli); + nodes.push(node.clone()); + node.start().unwrap(); + node.start_proxy(&wal_acceptors); } // create schema @@ -111,7 +129,7 @@ fn test_acceptors_restarts() { node.start().unwrap(); // start proxy - let _proxy = node.start_proxy(wal_acceptors); + let _proxy = node.start_proxy(&wal_acceptors); let mut failed_node: Option = None; // check basic work with table @@ -172,7 +190,7 @@ fn test_acceptors_unavailability() { node.start().unwrap(); // start proxy - let _proxy = node.start_proxy(wal_acceptors); + let _proxy = node.start_proxy(&wal_acceptors); // check basic work with table node.safe_psql( @@ -250,7 +268,7 @@ fn test_race_conditions() { node.start().unwrap(); // start proxy - let _proxy = node.start_proxy(wal_acceptors); + let _proxy = node.start_proxy(&wal_acceptors); // check basic work with table node.safe_psql( diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index b98cca4ca1..10336d84f5 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,6 +8,7 @@ use std::io; use std::process::exit; use std::thread; use std::fs::{File, OpenOptions}; +use std::path::PathBuf; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -101,11 +102,11 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { if conf.daemonize { info!("daemonizing..."); - let repodir = zenith_repo_dir(); + let repodir = PathBuf::from(zenith_repo_dir()); // There should'n be any logging to stdin/stdout. Redirect it to the main log so // that we will see any accidental manual fprintf's or backtraces. - let log_filename = repodir.clone() + "pageserver.log"; + let log_filename = repodir.join("pageserver.log"); let stdout = OpenOptions::new() .create(true) .append(true) @@ -118,7 +119,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { .with_context(|| format!("failed to open {:?}", &log_filename))?; let daemonize = Daemonize::new() - .pid_file(repodir.clone() + "/pageserver.pid") + .pid_file(repodir.clone().join("pageserver.pid")) .working_directory(repodir) .stdout(stdout) .stderr(stderr); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3005e5e095..c9b547896c 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -37,6 +37,16 @@ impl ZTimelineId { ZTimelineId(b) } + pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId { + let mut arr = [0u8; 16]; + buf.copy_to_slice(&mut arr); + ZTimelineId::from(arr) + } + + pub fn as_arr(&self) -> [u8; 16] { + self.0 + } + pub fn to_str(self: &ZTimelineId) -> String { hex::encode(self.0) } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 20b3460d8c..edeba3b21f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -537,7 +537,7 @@ impl PageCache { self.valid_lsn_condvar.notify_all(); self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); } // diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index cc972f713e..9240f2f657 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -419,13 +419,18 @@ impl FeMessage { pub fn thread_main(conf: &PageServerConf) { // Create a new thread pool // - // FIXME: keep it single-threaded for now, make it easier to debug with gdb, - // and we're not concerned with performance yet. - //let runtime = runtime::Runtime::new().unwrap(); - let runtime = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + // FIXME: It would be nice to keep this single-threaded for debugging purposes, + // but that currently leads to a deadlock: if a GetPage@LSN request arrives + // for an LSN that hasn't been received yet, the thread gets stuck waiting for + // the WAL to arrive. If the WAL receiver hasn't been launched yet, i.e + // we haven't received a "callmemaybe" request yet to tell us where to get the + // WAL, we will not have a thread available to process the "callmemaybe" + // request when it does arrive. Using a thread pool alleviates the problem so + // that it doesn't happen in the tests anymore, but in principle it could still + // happen if we receive enough GetPage@LSN requests to consume all of the + // available threads. + //let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let runtime = runtime::Runtime::new().unwrap(); info!("Starting page server on {}", conf.listen_addr); diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index d8ec810f36..22ab546d5e 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -114,7 +114,7 @@ impl WalStreamDecoder { let hdr = self.decode_XLogLongPageHeaderData(); if hdr.std.xlp_pageaddr != self.lsn { - return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}", + return Err(WalDecodeError::new(&format!("invalid xlog segment header at {:X}/{:X}", self.lsn >> 32, self.lsn & 0xffffffff))); } @@ -131,9 +131,9 @@ impl WalStreamDecoder { let hdr = self.decode_XLogPageHeaderData(); if hdr.xlp_pageaddr != self.lsn { - return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}", + return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}: {:?}", self.lsn >> 32, - self.lsn & 0xffffffff))); + self.lsn & 0xffffffff, hdr))); } // TODO: verify the remaining fields in the header diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 3f8fcd8722..23af8c2ee3 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -217,7 +217,7 @@ async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_pr // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_valid_lsn(lsn); + pcache.advance_last_record_lsn(lsn); } else { break; } diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 00576f055e..38a32bb730 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -10,15 +10,14 @@ use std::thread; use std::{fs::File, fs::OpenOptions}; use clap::{App, Arg}; +use anyhow::Result; use slog::Drain; -use pageserver::ZTimelineId; - use walkeeper::wal_service; use walkeeper::WalAcceptorConf; -fn main() -> Result<(), io::Error> { +fn main() -> Result<()> { let arg_matches = App::new("Zenith wal_acceptor") .about("Store WAL stream to local file system and push it to WAL receivers") .arg( @@ -29,10 +28,11 @@ fn main() -> Result<(), io::Error> { .help("Path to the WAL acceptor data directory"), ) .arg( - Arg::with_name("timelineid") - .long("timelineid") + Arg::with_name("systemid") + .long("systemid") .takes_value(true) - .help("zenith timeline id"), + .required(true) + .help("PostgreSQL system id, from pg_control"), ) .arg( Arg::with_name("listen") @@ -64,21 +64,23 @@ fn main() -> Result<(), io::Error> { ) .get_matches(); + let systemid_str = arg_matches.value_of("systemid").unwrap(); + let systemid = u64::from_str_radix(systemid_str, 10)?; + let mut conf = WalAcceptorConf { data_dir: PathBuf::from("./"), - timelineid: ZTimelineId::from([0u8; 16]), + systemid: systemid, daemonize: false, no_sync: false, pageserver_addr: None, - listen_addr: "127.0.0.1:5454".parse().unwrap(), + listen_addr: "127.0.0.1:5454".parse()?, }; if let Some(dir) = arg_matches.value_of("datadir") { conf.data_dir = PathBuf::from(dir); - } - if let Some(timelineid_str) = arg_matches.value_of("timelineid") { - conf.timelineid = ZTimelineId::from_str(timelineid_str).unwrap(); + // change into the data directory. + std::env::set_current_dir(&conf.data_dir)?; } if arg_matches.is_present("no-sync") { @@ -100,7 +102,7 @@ fn main() -> Result<(), io::Error> { start_wal_acceptor(conf) } -fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { +fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> { // Initialize logger let _scope_guard = init_logging(&conf)?; let _log_guard = slog_stdlog::init().unwrap(); @@ -115,16 +117,16 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { let stdout = OpenOptions::new() .create(true) .append(true) - .open(conf.data_dir.join("wal_acceptor.log")) + .open("wal_acceptor.log") .unwrap(); let stderr = OpenOptions::new() .create(true) .append(true) - .open(conf.data_dir.join("wal_acceptor.log")) + .open("wal_acceptor.log") .unwrap(); let daemonize = Daemonize::new() - .pid_file(conf.data_dir.join("wal_acceptor.pid")) + .pid_file("wal_acceptor.pid") .working_directory(Path::new(".")) .stdout(stdout) .stderr(stderr); diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 5f2f557b49..784ab730b6 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -6,12 +6,12 @@ mod pq_protocol; pub mod wal_service; pub mod xlog_utils; -use pageserver::ZTimelineId; +use crate::pq_protocol::SystemId; #[derive(Debug, Clone)] pub struct WalAcceptorConf { pub data_dir: PathBuf, - pub timelineid: ZTimelineId, + pub systemid: SystemId, pub daemonize: bool, pub no_sync: bool, pub listen_addr: SocketAddr, diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs index 299b830d5e..8179a734b9 100644 --- a/walkeeper/src/pq_protocol.rs +++ b/walkeeper/src/pq_protocol.rs @@ -1,5 +1,6 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use pageserver::ZTimelineId; use std::io; use std::str; @@ -37,7 +38,7 @@ pub enum BeMessage<'a> { pub struct FeStartupMessage { pub version: u32, pub kind: StartupRequestCode, - pub system_id: SystemId, + pub timelineid: ZTimelineId, } #[derive(Debug)] @@ -83,26 +84,33 @@ impl FeStartupMessage { let params_str = str::from_utf8(¶ms_bytes).unwrap(); let params = params_str.split('\0'); let mut options = false; - let mut system_id: u64 = 0; + let mut timelineid: Option = None; for p in params { if p == "options" { options = true; } else if options { for opt in p.split(' ') { - if opt.starts_with("system.id=") { - system_id = opt[10..].parse::().unwrap(); + if opt.starts_with("ztimelineid=") { + // FIXME: rethrow parsing error, don't unwrap + timelineid = Some(ZTimelineId::from_str(&opt[12..]).unwrap()); break; } } break; } } + if timelineid.is_none() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "timelineid is required", + )); + } buf.advance(len as usize); Ok(Some(FeMessage::StartupMessage(FeStartupMessage { version, kind, - system_id, + timelineid: timelineid.unwrap(), }))) } } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 1a8f764598..3dc873e27b 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -33,6 +33,7 @@ use tokio_postgres::{connect, Error, NoTls}; use crate::pq_protocol::*; use crate::xlog_utils::*; use crate::WalAcceptorConf; +use pageserver::ZTimelineId; type FullTransactionId = u64; @@ -64,7 +65,8 @@ struct ServerInfo { protocol_version: u32, /* proxy-safekeeper protocol version */ pg_version: u32, /* Postgres server version */ node_id: NodeId, - system_id: SystemId, /* Postgres system identifier */ + system_id: SystemId, + timeline_id: ZTimelineId, /* Zenith timelineid */ wal_end: XLogRecPtr, timeline: TimeLineID, wal_seg_size: u32, @@ -146,8 +148,8 @@ struct SharedState { * Database instance (tenant) */ #[derive(Debug)] -pub struct System { - id: SystemId, +pub struct Timeline { + timelineid: ZTimelineId, mutex: Mutex, cond: Notify, /* conditional variable used to notify wal senders */ } @@ -157,7 +159,7 @@ pub struct System { */ #[derive(Debug)] struct Connection { - system: Option>, + timeline: Option>, stream: TcpStream, /* Postgres connection */ inbuf: BytesMut, /* input buffer */ outbuf: BytesMut, /* output buffer */ @@ -211,6 +213,7 @@ impl Serializer for ServerInfo { buf.put_u32_le(self.pg_version); self.node_id.pack(buf); buf.put_u64_le(self.system_id); + buf.put_slice(&self.timeline_id.as_arr()); buf.put_u64_le(self.wal_end); buf.put_u32_le(self.timeline); buf.put_u32_le(self.wal_seg_size); @@ -221,6 +224,7 @@ impl Serializer for ServerInfo { pg_version: buf.get_u32_le(), node_id: NodeId::unpack(buf), system_id: buf.get_u64_le(), + timeline_id: ZTimelineId::get_from_buf(buf), wal_end: buf.get_u64_le(), timeline: buf.get_u32_le(), wal_seg_size: buf.get_u32_le(), @@ -278,6 +282,7 @@ impl SafeKeeperInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ node_id: NodeId { term: 0, uuid: 0 }, system_id: 0, /* Postgres system identifier */ + timeline_id: ZTimelineId::from([0u8; 16]), wal_end: 0, timeline: 0, wal_seg_size: 0, @@ -349,7 +354,7 @@ impl Serializer for SafeKeeperResponse { } lazy_static! { - pub static ref SYSTEMS: Mutex>> = Mutex::new(HashMap::new()); + pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); } pub fn thread_main(conf: WalAcceptorConf) { @@ -389,8 +394,8 @@ async fn main_loop(conf: &WalAcceptorConf) -> Result<()> { } } -impl System { - pub fn new(id: SystemId) -> System { +impl Timeline { + pub fn new(timelineid: ZTimelineId) -> Timeline { let shared_state = SharedState { commit_lsn: 0, info: SafeKeeperInfo::new(), @@ -401,8 +406,8 @@ impl System { catalog_xmin: u64::MAX, }, }; - System { - id, + Timeline { + timelineid, mutex: Mutex::new(shared_state), cond: Notify::new(), } @@ -444,11 +449,20 @@ impl System { } // Load and lock control file (prevent running more than one instance of safekeeper) - fn load_control_file(&self, conf: &WalAcceptorConf) { + fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { + + let mut shared_state = self.mutex.lock().unwrap(); + + if shared_state.control_file.is_some() { + info!("control file for timeline {} is already open", self.timelineid); + return Ok(()); + } + let control_file_path = conf .data_dir - .join(self.id.to_string()) + .join(self.timelineid.to_string()) .join(CONTROL_FILE_NAME); + info!("loading control file {}", control_file_path.display()); match OpenOptions::new() .read(true) .write(true) @@ -460,13 +474,12 @@ impl System { match file.try_lock_exclusive() { Ok(()) => {} Err(e) => { - panic!( + io_error!( "Control file {:?} is locked by some other process: {}", &control_file_path, e ); } } - let mut shared_state = self.mutex.lock().unwrap(); shared_state.control_file = Some(file); const SIZE: usize = mem::size_of::(); @@ -483,10 +496,10 @@ impl System { let my_info = SafeKeeperInfo::unpack(&mut input); if my_info.magic != SK_MAGIC { - panic!("Invalid control file magic: {}", my_info.magic); + io_error!("Invalid control file magic: {}", my_info.magic); } if my_info.format_version != SK_FORMAT_VERSION { - panic!( + io_error!( "Incompatible format version: {} vs. {}", my_info.format_version, SK_FORMAT_VERSION ); @@ -501,6 +514,7 @@ impl System { ); } } + Ok(()) } fn save_control_file(&self, sync: bool) -> Result<()> { @@ -521,7 +535,7 @@ impl System { impl Connection { pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection { Connection { - system: None, + timeline: None, stream: socket, inbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024), @@ -530,8 +544,8 @@ impl Connection { } } - fn system(&self) -> Arc { - self.system.as_ref().unwrap().clone() + fn timeline(&self) -> Arc { + self.timeline.as_ref().unwrap().clone() } async fn run(&mut self) -> Result<()> { @@ -563,12 +577,13 @@ impl Connection { "no_user", ); let callme = format!( - "callmemaybe {} host={} port={} replication=1 options='-c system.id={}'", - self.conf.timelineid, + "callmemaybe {} host={} port={} options='-c ztimelineid={}'", + self.timeline().timelineid, self.conf.listen_addr.ip(), self.conf.listen_addr.port(), - self.system().get_info().server.system_id, + self.timeline().timelineid ); + info!("requesting page server to connect to us: start {} {}", ps_connstr, callme); let (client, connection) = connect(&ps_connstr, NoTls).await?; // The connection object performs the actual communication with the database, @@ -583,22 +598,15 @@ impl Connection { Ok(()) } - fn set_system(&mut self, id: SystemId) -> Result<()> { - let mut systems = SYSTEMS.lock().unwrap(); - if id == 0 { - // non-multitenant configuration: just a single instance - if let Some(system) = systems.values().next() { - self.system = Some(system.clone()); - return Ok(()); - } - io_error!("No active instances"); + fn set_timeline(&mut self, timelineid: ZTimelineId) -> Result<()> { + let mut timelines = TIMELINES.lock().unwrap(); + if !timelines.contains_key(&timelineid) { + let timeline_dir = timelineid.to_str(); + info!("creating timeline dir {}", &timeline_dir); + fs::create_dir_all(&timeline_dir)?; + timelines.insert(timelineid, Arc::new(Timeline::new(timelineid))); } - if !systems.contains_key(&id) { - let system_dir = self.conf.data_dir.join(id.to_string()); - fs::create_dir_all(system_dir)?; - systems.insert(id, Arc::new(System::new(id))); - } - self.system = Some(systems.get(&id).unwrap().clone()); + self.timeline = Some(timelines.get(&timelineid).unwrap().clone()); Ok(()) } @@ -607,14 +615,16 @@ impl Connection { // Receive information about server let server_info = self.read_req::().await?; info!( - "Start handshake with wal_proposer {} sysid {}", + "Start handshake with wal_proposer {} sysid {} timeline {}", self.stream.peer_addr()?, - server_info.system_id + server_info.system_id, + server_info.timeline_id, ); - self.set_system(server_info.system_id)?; - self.system().load_control_file(&self.conf); + // FIXME: also check that the system identifier matches + self.set_timeline(server_info.timeline_id)?; + self.timeline().load_control_file(&self.conf)?; - let mut my_info = self.system().get_info(); + let mut my_info = self.timeline().get_info(); /* Check protocol compatibility */ if server_info.protocol_version != SK_PROTOCOL_VERSION { @@ -663,9 +673,9 @@ impl Connection { ); } my_info.server.node_id = prop.node_id; - self.system().set_info(&my_info); + self.timeline().set_info(&my_info); /* Need to persist our vote first */ - self.system().save_control_file(true)?; + self.timeline().save_control_file(true)?; let mut flushed_restart_lsn: XLogRecPtr = 0; let wal_seg_size = server_info.wal_seg_size as usize; @@ -684,8 +694,8 @@ impl Connection { } info!( - "Start streaming from server {} address {:?}", - server_info.system_id, + "Start streaming from timeline {} address {:?}", + server_info.timeline_id, self.stream.peer_addr()? ); @@ -707,6 +717,9 @@ impl Connection { let rec_size = (end_pos - start_pos) as usize; assert!(rec_size <= MAX_SEND_SIZE); + debug!("received for {} bytes between {:X}/{:X} and {:X}/{:X}", + rec_size, start_pos >> 32, start_pos & 0xffffffff, end_pos >> 32, end_pos & 0xffffffff); + /* Receive message body */ self.inbuf.resize(rec_size, 0u8); self.stream.read_exact(&mut self.inbuf[0..rec_size]).await?; @@ -737,7 +750,7 @@ impl Connection { * when restart_lsn delta exceeds WAL segment size. */ sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn; - self.system().save_control_file(sync_control_file)?; + self.timeline().save_control_file(sync_control_file)?; if sync_control_file { flushed_restart_lsn = my_info.restart_lsn; @@ -748,7 +761,7 @@ impl Connection { let resp = SafeKeeperResponse { epoch: my_info.epoch, flush_lsn: end_pos, - hs_feedback: self.system().get_hs_feedback(), + hs_feedback: self.timeline().get_hs_feedback(), }; self.start_sending(); resp.pack(&mut self.outbuf); @@ -758,7 +771,7 @@ impl Connection { * Ping wal sender that new data is available. * FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper. */ - self.system() + self.timeline() .notify_wal_senders(min(req.commit_lsn, end_pos)); } Ok(()) @@ -809,7 +822,7 @@ impl Connection { } // - // Send WAL to replica or WAL sender using standard libpq replication protocol + // Send WAL to replica or WAL receiver using standard libpq replication protocol // async fn send_wal(&mut self) -> Result<()> { info!("WAL sender to {:?} is started", self.stream.peer_addr()?); @@ -830,7 +843,7 @@ impl Connection { BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery); self.send().await?; self.init_done = true; - self.set_system(m.system_id)?; + self.set_timeline(m.timelineid)?; } StartupRequestCode::Cancel => return Ok(()), } @@ -863,7 +876,7 @@ impl Connection { let (start_pos, timeline) = self.find_end_of_wal(false); let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32); let tli = timeline.to_string(); - let sysid = self.system().get_info().server.system_id.to_string(); + let sysid = self.timeline().get_info().server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli_bytes = tli.as_bytes(); let sysid_bytes = sysid.as_bytes(); @@ -919,7 +932,7 @@ impl Connection { } else { 0 }; - let wal_seg_size = self.system().get_info().server.wal_seg_size as usize; + let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize; if wal_seg_size == 0 { io_error!("Can not start replication before connecting to wal_proposer"); } @@ -937,15 +950,6 @@ impl Connection { BeMessage::write(&mut self.outbuf, &BeMessage::Copy); self.send().await?; - /* - * Always start streaming at the beginning of a segment - * - * FIXME: It is common practice to start streaming at the beginning of - * the segment, but it should be up to the client to decide that. We - * shouldn't enforce that here. - */ - start_pos -= XLogSegmentOffset(start_pos, wal_seg_size) as u64; - let mut end_pos: XLogRecPtr; let mut commit_lsn: XLogRecPtr; let mut wal_file: Option = None; @@ -962,19 +966,18 @@ impl Connection { end_pos = stop_pos; } else { /* normal mode */ + let timeline = self.timeline(); loop { // Rust doesn't allow to grab async result from mutex scope - let system = self.system(); - let notified = system.cond.notified(); { - let shared_state = system.mutex.lock().unwrap(); + let shared_state = timeline.mutex.lock().unwrap(); commit_lsn = shared_state.commit_lsn; if start_pos < commit_lsn { end_pos = commit_lsn; break; } } - notified.await; + timeline.cond.notified().await; } } if end_pos == END_REPLICATION_MARKER { @@ -985,7 +988,7 @@ impl Connection { Ok(0) => break, Ok(_) => match self.parse_message()? { Some(FeMessage::CopyData(m)) => self - .system() + .timeline() .add_hs_feedback(HotStandbyFeedback::parse(&m.body)), _ => {} }, @@ -1006,7 +1009,7 @@ impl Connection { let wal_file_path = self .conf .data_dir - .join(self.system().id.to_string()) + .join(self.timeline().timelineid.to_string()) .join(wal_file_name.clone() + ".partial"); if let Ok(opened_file) = File::open(&wal_file_path) { file = opened_file; @@ -1014,7 +1017,7 @@ impl Connection { let wal_file_path = self .conf .data_dir - .join(self.system().id.to_string()) + .join(self.timeline().timelineid.to_string()) .join(wal_file_name); match File::open(&wal_file_path) { Ok(opened_file) => file = opened_file, @@ -1036,6 +1039,8 @@ impl Connection { let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size; let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE; let data_end = data_start + send_size; + + file.seek(SeekFrom::Start(xlogoff as u64))?; file.read_exact(&mut self.outbuf[data_start..data_end])?; self.outbuf[0] = b'd'; BigEndian::write_u32( @@ -1050,6 +1055,9 @@ impl Connection { self.stream.write_all(&self.outbuf[0..msg_size]).await?; start_pos += send_size as u64; + debug!("Sent WAL to page server up to {:X}/{:>08X}", + (end_pos>>32) as u32, end_pos as u32); + if XLogSegmentOffset(start_pos, wal_seg_size) != 0 { wal_file = Some(file); } @@ -1104,12 +1112,12 @@ impl Connection { let wal_file_path = self .conf .data_dir - .join(self.system().id.to_string()) + .join(self.timeline().timelineid.to_str()) .join(wal_file_name.clone()); let wal_file_partial_path = self .conf .data_dir - .join(self.system().id.to_string()) + .join(self.timeline().timelineid.to_str()) .join(wal_file_name.clone() + ".partial"); { @@ -1172,7 +1180,7 @@ impl Connection { fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) { find_end_of_wal( &self.conf.data_dir, - self.system().get_info().server.wal_seg_size as usize, + self.timeline().get_info().server.wal_seg_size as usize, precise, ) }