Make WAL safekeeper work with zenith timelines

This commit is contained in:
Heikki Linnakangas
2021-04-19 15:25:50 +03:00
parent 3600b33f1c
commit f69db17409
14 changed files with 228 additions and 175 deletions

View File

@@ -1,4 +1,3 @@
use std::fs::File;
use std::fs::{self, OpenOptions};
use std::os::unix::fs::PermissionsExt;
use std::net::TcpStream;
@@ -402,40 +401,18 @@ impl PostgresNode {
Client::connect(connstring.as_str(), NoTls).unwrap()
}
/* Create stub controlfile and respective xlog to start computenode */
pub fn setup_controlfile(&self) {
let filepath = format!("{}/global/pg_control", self.pgdata().to_str().unwrap());
{
File::create(filepath).unwrap();
}
let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal");
let pg_resetwal = Command::new(pg_resetwal_path)
.args(&["-D", self.pgdata().to_str().unwrap()])
.arg("-f")
// TODO probably we will have to modify pg_resetwal
// .arg("--compute-node")
.status()
.expect("failed to execute pg_resetwal");
if !pg_resetwal.success() {
panic!("pg_resetwal failed");
}
}
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["-s", &wal_acceptors])
.args(&["--ztimelineid", &self.timelineid.to_str()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(OpenOptions::new()
.create(true)
.append(true)
.open(self.env.repo_path.join("safepkeeper_proxy.log"))
.unwrap())
.open(self.pgdata().join("safekeeper_proxy.log")).unwrap())
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },

View File

@@ -7,9 +7,10 @@
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::process::{Command, Stdio};
use bytes::Bytes;
use rand::Rng;
use anyhow::Context;
use hex;
use serde_derive::{Deserialize, Serialize};
@@ -29,6 +30,9 @@ pub struct LocalEnv {
// Path to the Repository. Here page server and compute nodes will create and store their data.
pub repo_path: PathBuf,
// System identifier, from the PostgreSQL control file
pub systemid: u64,
// Path to postgres distribution. It's expected that "bin", "include",
// "lib", "share" from postgres distribution are there. If at some point
// in time we will be able to run against vanilla postgres we may split that
@@ -96,40 +100,32 @@ pub fn init() -> Result<()> {
}
// ok, we are good to go
let conf = LocalEnv {
let mut conf = LocalEnv {
repo_path: repo_path.clone(),
pg_distrib_dir,
zenith_distrib_dir,
systemid: 0,
};
init_repo(&conf)?;
// write config
let toml = toml::to_string(&conf)?;
fs::write(repo_path.join("config"), toml)?;
init_repo(&mut conf)?;
Ok(())
}
pub fn init_repo(local_env: &LocalEnv) -> Result<()>
pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
{
let repopath = String::from(local_env.repo_path.to_str().unwrap());
fs::create_dir(&repopath)?;
fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath))?;
fs::create_dir(repopath.clone() + "/pgdatadirs")?;
fs::create_dir(repopath.clone() + "/timelines")?;
fs::create_dir(repopath.clone() + "/refs")?;
fs::create_dir(repopath.clone() + "/refs/branches")?;
fs::create_dir(repopath.clone() + "/refs/tags")?;
// Create empty config file
let configpath = repopath.clone() + "/config";
fs::write(&configpath, r##"
# Example config file. Nothing here yet.
"##)
.expect(&format!("Unable to write file {}", &configpath));
println!("created directory structure in {}", repopath);
// Create initial timeline
let tli = create_timeline(&local_env, None)?;
let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli));
println!("created initial timeline {}", timelinedir);
// Run initdb
//
@@ -139,32 +135,50 @@ pub fn init_repo(local_env: &LocalEnv) -> Result<()>
let initdb_path = local_env.pg_bin_dir().join("initdb");
let _initdb =
Command::new(initdb_path)
.args(&["-D", "tmp", "--no-instructions"])
.args(&["-D", "tmp"])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.stdout(Stdio::null())
.status()
.expect("failed to execute initdb");
.with_context(|| "failed to execute initdb")?;
println!("initdb succeeded");
// Read control file to extract the LSN
// Read control file to extract the LSN and system id
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read("tmp/global/pg_control")?))?;
let systemid = controlfile.system_identifier;
let lsn = controlfile.checkPoint;
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.clone() + "/wal/000000010000000000000001.partial")?;
println!("moved initial WAL file");
// Remove pg_wal
fs::remove_dir_all("tmp/pg_wal")?;
println!("removed tmp/pg_wal");
force_crash_recovery(&PathBuf::from("tmp"))?;
println!("updated pg_control");
let target = timelinedir.clone() + "/snapshots/" + &lsnstr;
fs::rename("tmp", target)?;
fs::rename("tmp", &target)?;
println!("moved 'tmp' to {}", &target);
// Create 'main' branch to refer to the initial timeline
let data = hex::encode(tli);
fs::write(repopath.clone() + "/refs/branches/main", data)?;
println!("created main branch");
// Also update the system id in the LocalEnv
local_env.systemid = systemid;
// write config
let toml = toml::to_string(&local_env)?;
fs::write(repopath.clone() + "/config", toml)?;
println!("new zenith repository was created in {}", &repopath);
Ok(())
}
@@ -209,17 +223,20 @@ pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
// local env for tests
pub fn test_env(testname: &str) -> LocalEnv {
fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check");
let repo_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_check/").join(testname);
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&repo_path);
let local_env = LocalEnv {
let mut local_env = LocalEnv {
repo_path,
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: cargo_bin_dir(),
systemid: 0,
};
init_repo(&local_env).unwrap();
init_repo(&mut local_env).expect("could not initialize zenith repository");
return local_env;
}

View File

@@ -176,6 +176,7 @@ impl PageServerNode {
cmd .args(&["-l", self.address().to_string().as_str()])
.arg("-d")
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
@@ -294,6 +295,12 @@ impl WalAcceptorNode {
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", &self.env.systemid.to_string()])
// Tell page server it can receive WAL from this WAL safekeeper
// FIXME: If there are multiple safekeepers, they will all inform
// the page server. Only the last "notification" will stay in effect.
// So it's pretty random which safekeeper the page server will connect to
.args(&["--pageserver", "127.0.0.1:64000"])
.arg("-d")
.arg("-n")
.status()

View File

@@ -2,6 +2,8 @@
use control_plane::compute::ComputeControlPlane;
use control_plane::storage::TestStorageControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use pageserver::ZTimelineId;
use rand::Rng;
use std::sync::Arc;
@@ -23,7 +25,7 @@ fn test_acceptors_normal_work() {
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
let _proxy = node.start_proxy(&wal_acceptors);
// check basic work with table
node.safe_psql(
@@ -44,23 +46,39 @@ fn test_acceptors_normal_work() {
// check wal files equality
}
// Run page server and multiple safekeepers, and multiple compute nodes running
// against different timelines.
#[test]
fn test_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
fn test_many_timelines() {
// Initialize a new repository, and set up WAL safekeepers and page server.
const REDUNDANCY: usize = 3;
const N_NODES: usize = 5;
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
const N_TIMELINES: usize = 5;
let local_env = local_env::test_env("test_many_timelines");
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
// Create branches
let mut timelines: Vec<ZTimelineId> = Vec::new();
let maintli = storage_cplane.get_branch_timeline("main"); // main branch
timelines.push(maintli);
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
for i in 1..N_TIMELINES { // additional branches
let branchname = format!("experimental{}", i);
local_env::create_branch(&local_env, &branchname,
PointInTime { timelineid: maintli,
lsn: startpoint }).unwrap();
let tli = storage_cplane.get_branch_timeline(&branchname);
timelines.push(tli);
}
// start postgres on each timeline
let mut nodes = Vec::new();
let mut proxies = Vec::new();
for _ in 0..N_NODES {
let node = compute_cplane.new_test_master_node();
nodes.push(node);
nodes.last().unwrap().start().unwrap();
proxies.push(nodes.last().unwrap().start_proxy(wal_acceptors.clone()));
for tli in timelines {
let node = compute_cplane.new_test_node(tli);
nodes.push(node.clone());
node.start().unwrap();
node.start_proxy(&wal_acceptors);
}
// create schema
@@ -111,7 +129,7 @@ fn test_acceptors_restarts() {
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
let _proxy = node.start_proxy(&wal_acceptors);
let mut failed_node: Option<usize> = None;
// check basic work with table
@@ -172,7 +190,7 @@ fn test_acceptors_unavailability() {
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
let _proxy = node.start_proxy(&wal_acceptors);
// check basic work with table
node.safe_psql(
@@ -250,7 +268,7 @@ fn test_race_conditions() {
node.start().unwrap();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
let _proxy = node.start_proxy(&wal_acceptors);
// check basic work with table
node.safe_psql(

View File

@@ -8,6 +8,7 @@ use std::io;
use std::process::exit;
use std::thread;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -101,11 +102,11 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
if conf.daemonize {
info!("daemonizing...");
let repodir = zenith_repo_dir();
let repodir = PathBuf::from(zenith_repo_dir());
// There should'n be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fprintf's or backtraces.
let log_filename = repodir.clone() + "pageserver.log";
let log_filename = repodir.join("pageserver.log");
let stdout = OpenOptions::new()
.create(true)
.append(true)
@@ -118,7 +119,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
.with_context(|| format!("failed to open {:?}", &log_filename))?;
let daemonize = Daemonize::new()
.pid_file(repodir.clone() + "/pageserver.pid")
.pid_file(repodir.clone().join("pageserver.pid"))
.working_directory(repodir)
.stdout(stdout)
.stderr(stderr);

View File

@@ -37,6 +37,16 @@ impl ZTimelineId {
ZTimelineId(b)
}
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZTimelineId {
let mut arr = [0u8; 16];
buf.copy_to_slice(&mut arr);
ZTimelineId::from(arr)
}
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
pub fn to_str(self: &ZTimelineId) -> String {
hex::encode(self.0)
}

View File

@@ -537,7 +537,7 @@ impl PageCache {
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
//

View File

@@ -419,13 +419,18 @@ impl FeMessage {
pub fn thread_main(conf: &PageServerConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
// FIXME: It would be nice to keep this single-threaded for debugging purposes,
// but that currently leads to a deadlock: if a GetPage@LSN request arrives
// for an LSN that hasn't been received yet, the thread gets stuck waiting for
// the WAL to arrive. If the WAL receiver hasn't been launched yet, i.e
// we haven't received a "callmemaybe" request yet to tell us where to get the
// WAL, we will not have a thread available to process the "callmemaybe"
// request when it does arrive. Using a thread pool alleviates the problem so
// that it doesn't happen in the tests anymore, but in principle it could still
// happen if we receive enough GetPage@LSN requests to consume all of the
// available threads.
//let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
let runtime = runtime::Runtime::new().unwrap();
info!("Starting page server on {}", conf.listen_addr);

View File

@@ -114,7 +114,7 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogLongPageHeaderData();
if hdr.std.xlp_pageaddr != self.lsn {
return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}",
return Err(WalDecodeError::new(&format!("invalid xlog segment header at {:X}/{:X}",
self.lsn >> 32,
self.lsn & 0xffffffff)));
}
@@ -131,9 +131,9 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogPageHeaderData();
if hdr.xlp_pageaddr != self.lsn {
return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}",
return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}: {:?}",
self.lsn >> 32,
self.lsn & 0xffffffff)));
self.lsn & 0xffffffff, hdr)));
}
// TODO: verify the remaining fields in the header

View File

@@ -217,7 +217,7 @@ async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_pr
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_valid_lsn(lsn);
pcache.advance_last_record_lsn(lsn);
} else {
break;
}

View File

@@ -10,15 +10,14 @@ use std::thread;
use std::{fs::File, fs::OpenOptions};
use clap::{App, Arg};
use anyhow::Result;
use slog::Drain;
use pageserver::ZTimelineId;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
fn main() -> Result<(), io::Error> {
fn main() -> Result<()> {
let arg_matches = App::new("Zenith wal_acceptor")
.about("Store WAL stream to local file system and push it to WAL receivers")
.arg(
@@ -29,10 +28,11 @@ fn main() -> Result<(), io::Error> {
.help("Path to the WAL acceptor data directory"),
)
.arg(
Arg::with_name("timelineid")
.long("timelineid")
Arg::with_name("systemid")
.long("systemid")
.takes_value(true)
.help("zenith timeline id"),
.required(true)
.help("PostgreSQL system id, from pg_control"),
)
.arg(
Arg::with_name("listen")
@@ -64,21 +64,23 @@ fn main() -> Result<(), io::Error> {
)
.get_matches();
let systemid_str = arg_matches.value_of("systemid").unwrap();
let systemid = u64::from_str_radix(systemid_str, 10)?;
let mut conf = WalAcceptorConf {
data_dir: PathBuf::from("./"),
timelineid: ZTimelineId::from([0u8; 16]),
systemid: systemid,
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_addr: "127.0.0.1:5454".parse().unwrap(),
listen_addr: "127.0.0.1:5454".parse()?,
};
if let Some(dir) = arg_matches.value_of("datadir") {
conf.data_dir = PathBuf::from(dir);
}
if let Some(timelineid_str) = arg_matches.value_of("timelineid") {
conf.timelineid = ZTimelineId::from_str(timelineid_str).unwrap();
// change into the data directory.
std::env::set_current_dir(&conf.data_dir)?;
}
if arg_matches.is_present("no-sync") {
@@ -100,7 +102,7 @@ fn main() -> Result<(), io::Error> {
start_wal_acceptor(conf)
}
fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> {
fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
// Initialize logger
let _scope_guard = init_logging(&conf)?;
let _log_guard = slog_stdlog::init().unwrap();
@@ -115,16 +117,16 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> {
let stdout = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("wal_acceptor.log"))
.open("wal_acceptor.log")
.unwrap();
let stderr = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("wal_acceptor.log"))
.open("wal_acceptor.log")
.unwrap();
let daemonize = Daemonize::new()
.pid_file(conf.data_dir.join("wal_acceptor.pid"))
.pid_file("wal_acceptor.pid")
.working_directory(Path::new("."))
.stdout(stdout)
.stderr(stderr);

View File

@@ -6,12 +6,12 @@ mod pq_protocol;
pub mod wal_service;
pub mod xlog_utils;
use pageserver::ZTimelineId;
use crate::pq_protocol::SystemId;
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub timelineid: ZTimelineId,
pub systemid: SystemId,
pub daemonize: bool,
pub no_sync: bool,
pub listen_addr: SocketAddr,

View File

@@ -1,5 +1,6 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pageserver::ZTimelineId;
use std::io;
use std::str;
@@ -37,7 +38,7 @@ pub enum BeMessage<'a> {
pub struct FeStartupMessage {
pub version: u32,
pub kind: StartupRequestCode,
pub system_id: SystemId,
pub timelineid: ZTimelineId,
}
#[derive(Debug)]
@@ -83,26 +84,33 @@ impl FeStartupMessage {
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
let mut system_id: u64 = 0;
let mut timelineid: Option<ZTimelineId> = None;
for p in params {
if p == "options" {
options = true;
} else if options {
for opt in p.split(' ') {
if opt.starts_with("system.id=") {
system_id = opt[10..].parse::<u64>().unwrap();
if opt.starts_with("ztimelineid=") {
// FIXME: rethrow parsing error, don't unwrap
timelineid = Some(ZTimelineId::from_str(&opt[12..]).unwrap());
break;
}
}
break;
}
}
if timelineid.is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"timelineid is required",
));
}
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
system_id,
timelineid: timelineid.unwrap(),
})))
}
}

View File

@@ -33,6 +33,7 @@ use tokio_postgres::{connect, Error, NoTls};
use crate::pq_protocol::*;
use crate::xlog_utils::*;
use crate::WalAcceptorConf;
use pageserver::ZTimelineId;
type FullTransactionId = u64;
@@ -64,7 +65,8 @@ struct ServerInfo {
protocol_version: u32, /* proxy-safekeeper protocol version */
pg_version: u32, /* Postgres server version */
node_id: NodeId,
system_id: SystemId, /* Postgres system identifier */
system_id: SystemId,
timeline_id: ZTimelineId, /* Zenith timelineid */
wal_end: XLogRecPtr,
timeline: TimeLineID,
wal_seg_size: u32,
@@ -146,8 +148,8 @@ struct SharedState {
* Database instance (tenant)
*/
#[derive(Debug)]
pub struct System {
id: SystemId,
pub struct Timeline {
timelineid: ZTimelineId,
mutex: Mutex<SharedState>,
cond: Notify, /* conditional variable used to notify wal senders */
}
@@ -157,7 +159,7 @@ pub struct System {
*/
#[derive(Debug)]
struct Connection {
system: Option<Arc<System>>,
timeline: Option<Arc<Timeline>>,
stream: TcpStream, /* Postgres connection */
inbuf: BytesMut, /* input buffer */
outbuf: BytesMut, /* output buffer */
@@ -211,6 +213,7 @@ impl Serializer for ServerInfo {
buf.put_u32_le(self.pg_version);
self.node_id.pack(buf);
buf.put_u64_le(self.system_id);
buf.put_slice(&self.timeline_id.as_arr());
buf.put_u64_le(self.wal_end);
buf.put_u32_le(self.timeline);
buf.put_u32_le(self.wal_seg_size);
@@ -221,6 +224,7 @@ impl Serializer for ServerInfo {
pg_version: buf.get_u32_le(),
node_id: NodeId::unpack(buf),
system_id: buf.get_u64_le(),
timeline_id: ZTimelineId::get_from_buf(buf),
wal_end: buf.get_u64_le(),
timeline: buf.get_u32_le(),
wal_seg_size: buf.get_u32_le(),
@@ -278,6 +282,7 @@ impl SafeKeeperInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
node_id: NodeId { term: 0, uuid: 0 },
system_id: 0, /* Postgres system identifier */
timeline_id: ZTimelineId::from([0u8; 16]),
wal_end: 0,
timeline: 0,
wal_seg_size: 0,
@@ -349,7 +354,7 @@ impl Serializer for SafeKeeperResponse {
}
lazy_static! {
pub static ref SYSTEMS: Mutex<HashMap<SystemId, Arc<System>>> = Mutex::new(HashMap::new());
pub static ref TIMELINES: Mutex<HashMap<ZTimelineId, Arc<Timeline>>> = Mutex::new(HashMap::new());
}
pub fn thread_main(conf: WalAcceptorConf) {
@@ -389,8 +394,8 @@ async fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
}
}
impl System {
pub fn new(id: SystemId) -> System {
impl Timeline {
pub fn new(timelineid: ZTimelineId) -> Timeline {
let shared_state = SharedState {
commit_lsn: 0,
info: SafeKeeperInfo::new(),
@@ -401,8 +406,8 @@ impl System {
catalog_xmin: u64::MAX,
},
};
System {
id,
Timeline {
timelineid,
mutex: Mutex::new(shared_state),
cond: Notify::new(),
}
@@ -444,11 +449,20 @@ impl System {
}
// Load and lock control file (prevent running more than one instance of safekeeper)
fn load_control_file(&self, conf: &WalAcceptorConf) {
fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
if shared_state.control_file.is_some() {
info!("control file for timeline {} is already open", self.timelineid);
return Ok(());
}
let control_file_path = conf
.data_dir
.join(self.id.to_string())
.join(self.timelineid.to_string())
.join(CONTROL_FILE_NAME);
info!("loading control file {}", control_file_path.display());
match OpenOptions::new()
.read(true)
.write(true)
@@ -460,13 +474,12 @@ impl System {
match file.try_lock_exclusive() {
Ok(()) => {}
Err(e) => {
panic!(
io_error!(
"Control file {:?} is locked by some other process: {}",
&control_file_path, e
);
}
}
let mut shared_state = self.mutex.lock().unwrap();
shared_state.control_file = Some(file);
const SIZE: usize = mem::size_of::<SafeKeeperInfo>();
@@ -483,10 +496,10 @@ impl System {
let my_info = SafeKeeperInfo::unpack(&mut input);
if my_info.magic != SK_MAGIC {
panic!("Invalid control file magic: {}", my_info.magic);
io_error!("Invalid control file magic: {}", my_info.magic);
}
if my_info.format_version != SK_FORMAT_VERSION {
panic!(
io_error!(
"Incompatible format version: {} vs. {}",
my_info.format_version, SK_FORMAT_VERSION
);
@@ -501,6 +514,7 @@ impl System {
);
}
}
Ok(())
}
fn save_control_file(&self, sync: bool) -> Result<()> {
@@ -521,7 +535,7 @@ impl System {
impl Connection {
pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection {
Connection {
system: None,
timeline: None,
stream: socket,
inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024),
@@ -530,8 +544,8 @@ impl Connection {
}
}
fn system(&self) -> Arc<System> {
self.system.as_ref().unwrap().clone()
fn timeline(&self) -> Arc<Timeline> {
self.timeline.as_ref().unwrap().clone()
}
async fn run(&mut self) -> Result<()> {
@@ -563,12 +577,13 @@ impl Connection {
"no_user",
);
let callme = format!(
"callmemaybe {} host={} port={} replication=1 options='-c system.id={}'",
self.conf.timelineid,
"callmemaybe {} host={} port={} options='-c ztimelineid={}'",
self.timeline().timelineid,
self.conf.listen_addr.ip(),
self.conf.listen_addr.port(),
self.system().get_info().server.system_id,
self.timeline().timelineid
);
info!("requesting page server to connect to us: start {} {}", ps_connstr, callme);
let (client, connection) = connect(&ps_connstr, NoTls).await?;
// The connection object performs the actual communication with the database,
@@ -583,22 +598,15 @@ impl Connection {
Ok(())
}
fn set_system(&mut self, id: SystemId) -> Result<()> {
let mut systems = SYSTEMS.lock().unwrap();
if id == 0 {
// non-multitenant configuration: just a single instance
if let Some(system) = systems.values().next() {
self.system = Some(system.clone());
return Ok(());
}
io_error!("No active instances");
fn set_timeline(&mut self, timelineid: ZTimelineId) -> Result<()> {
let mut timelines = TIMELINES.lock().unwrap();
if !timelines.contains_key(&timelineid) {
let timeline_dir = timelineid.to_str();
info!("creating timeline dir {}", &timeline_dir);
fs::create_dir_all(&timeline_dir)?;
timelines.insert(timelineid, Arc::new(Timeline::new(timelineid)));
}
if !systems.contains_key(&id) {
let system_dir = self.conf.data_dir.join(id.to_string());
fs::create_dir_all(system_dir)?;
systems.insert(id, Arc::new(System::new(id)));
}
self.system = Some(systems.get(&id).unwrap().clone());
self.timeline = Some(timelines.get(&timelineid).unwrap().clone());
Ok(())
}
@@ -607,14 +615,16 @@ impl Connection {
// Receive information about server
let server_info = self.read_req::<ServerInfo>().await?;
info!(
"Start handshake with wal_proposer {} sysid {}",
"Start handshake with wal_proposer {} sysid {} timeline {}",
self.stream.peer_addr()?,
server_info.system_id
server_info.system_id,
server_info.timeline_id,
);
self.set_system(server_info.system_id)?;
self.system().load_control_file(&self.conf);
// FIXME: also check that the system identifier matches
self.set_timeline(server_info.timeline_id)?;
self.timeline().load_control_file(&self.conf)?;
let mut my_info = self.system().get_info();
let mut my_info = self.timeline().get_info();
/* Check protocol compatibility */
if server_info.protocol_version != SK_PROTOCOL_VERSION {
@@ -663,9 +673,9 @@ impl Connection {
);
}
my_info.server.node_id = prop.node_id;
self.system().set_info(&my_info);
self.timeline().set_info(&my_info);
/* Need to persist our vote first */
self.system().save_control_file(true)?;
self.timeline().save_control_file(true)?;
let mut flushed_restart_lsn: XLogRecPtr = 0;
let wal_seg_size = server_info.wal_seg_size as usize;
@@ -684,8 +694,8 @@ impl Connection {
}
info!(
"Start streaming from server {} address {:?}",
server_info.system_id,
"Start streaming from timeline {} address {:?}",
server_info.timeline_id,
self.stream.peer_addr()?
);
@@ -707,6 +717,9 @@ impl Connection {
let rec_size = (end_pos - start_pos) as usize;
assert!(rec_size <= MAX_SEND_SIZE);
debug!("received for {} bytes between {:X}/{:X} and {:X}/{:X}",
rec_size, start_pos >> 32, start_pos & 0xffffffff, end_pos >> 32, end_pos & 0xffffffff);
/* Receive message body */
self.inbuf.resize(rec_size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..rec_size]).await?;
@@ -737,7 +750,7 @@ impl Connection {
* when restart_lsn delta exceeds WAL segment size.
*/
sync_control_file |= flushed_restart_lsn + (wal_seg_size as u64) < my_info.restart_lsn;
self.system().save_control_file(sync_control_file)?;
self.timeline().save_control_file(sync_control_file)?;
if sync_control_file {
flushed_restart_lsn = my_info.restart_lsn;
@@ -748,7 +761,7 @@ impl Connection {
let resp = SafeKeeperResponse {
epoch: my_info.epoch,
flush_lsn: end_pos,
hs_feedback: self.system().get_hs_feedback(),
hs_feedback: self.timeline().get_hs_feedback(),
};
self.start_sending();
resp.pack(&mut self.outbuf);
@@ -758,7 +771,7 @@ impl Connection {
* Ping wal sender that new data is available.
* FlushLSN (end_pos) can be smaller than commitLSN in case we are at catching-up safekeeper.
*/
self.system()
self.timeline()
.notify_wal_senders(min(req.commit_lsn, end_pos));
}
Ok(())
@@ -809,7 +822,7 @@ impl Connection {
}
//
// Send WAL to replica or WAL sender using standard libpq replication protocol
// Send WAL to replica or WAL receiver using standard libpq replication protocol
//
async fn send_wal(&mut self) -> Result<()> {
info!("WAL sender to {:?} is started", self.stream.peer_addr()?);
@@ -830,7 +843,7 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::ReadyForQuery);
self.send().await?;
self.init_done = true;
self.set_system(m.system_id)?;
self.set_timeline(m.timelineid)?;
}
StartupRequestCode::Cancel => return Ok(()),
}
@@ -863,7 +876,7 @@ impl Connection {
let (start_pos, timeline) = self.find_end_of_wal(false);
let lsn = format!("{:X}/{:>08X}", (start_pos >> 32) as u32, start_pos as u32);
let tli = timeline.to_string();
let sysid = self.system().get_info().server.system_id.to_string();
let sysid = self.timeline().get_info().server.system_id.to_string();
let lsn_bytes = lsn.as_bytes();
let tli_bytes = tli.as_bytes();
let sysid_bytes = sysid.as_bytes();
@@ -919,7 +932,7 @@ impl Connection {
} else {
0
};
let wal_seg_size = self.system().get_info().server.wal_seg_size as usize;
let wal_seg_size = self.timeline().get_info().server.wal_seg_size as usize;
if wal_seg_size == 0 {
io_error!("Can not start replication before connecting to wal_proposer");
}
@@ -937,15 +950,6 @@ impl Connection {
BeMessage::write(&mut self.outbuf, &BeMessage::Copy);
self.send().await?;
/*
* Always start streaming at the beginning of a segment
*
* FIXME: It is common practice to start streaming at the beginning of
* the segment, but it should be up to the client to decide that. We
* shouldn't enforce that here.
*/
start_pos -= XLogSegmentOffset(start_pos, wal_seg_size) as u64;
let mut end_pos: XLogRecPtr;
let mut commit_lsn: XLogRecPtr;
let mut wal_file: Option<File> = None;
@@ -962,19 +966,18 @@ impl Connection {
end_pos = stop_pos;
} else {
/* normal mode */
let timeline = self.timeline();
loop {
// Rust doesn't allow to grab async result from mutex scope
let system = self.system();
let notified = system.cond.notified();
{
let shared_state = system.mutex.lock().unwrap();
let shared_state = timeline.mutex.lock().unwrap();
commit_lsn = shared_state.commit_lsn;
if start_pos < commit_lsn {
end_pos = commit_lsn;
break;
}
}
notified.await;
timeline.cond.notified().await;
}
}
if end_pos == END_REPLICATION_MARKER {
@@ -985,7 +988,7 @@ impl Connection {
Ok(0) => break,
Ok(_) => match self.parse_message()? {
Some(FeMessage::CopyData(m)) => self
.system()
.timeline()
.add_hs_feedback(HotStandbyFeedback::parse(&m.body)),
_ => {}
},
@@ -1006,7 +1009,7 @@ impl Connection {
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(self.timeline().timelineid.to_string())
.join(wal_file_name.clone() + ".partial");
if let Ok(opened_file) = File::open(&wal_file_path) {
file = opened_file;
@@ -1014,7 +1017,7 @@ impl Connection {
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(self.timeline().timelineid.to_string())
.join(wal_file_name);
match File::open(&wal_file_path) {
Ok(opened_file) => file = opened_file,
@@ -1036,6 +1039,8 @@ impl Connection {
let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size;
let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE;
let data_end = data_start + send_size;
file.seek(SeekFrom::Start(xlogoff as u64))?;
file.read_exact(&mut self.outbuf[data_start..data_end])?;
self.outbuf[0] = b'd';
BigEndian::write_u32(
@@ -1050,6 +1055,9 @@ impl Connection {
self.stream.write_all(&self.outbuf[0..msg_size]).await?;
start_pos += send_size as u64;
debug!("Sent WAL to page server up to {:X}/{:>08X}",
(end_pos>>32) as u32, end_pos as u32);
if XLogSegmentOffset(start_pos, wal_seg_size) != 0 {
wal_file = Some(file);
}
@@ -1104,12 +1112,12 @@ impl Connection {
let wal_file_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(self.timeline().timelineid.to_str())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.data_dir
.join(self.system().id.to_string())
.join(self.timeline().timelineid.to_str())
.join(wal_file_name.clone() + ".partial");
{
@@ -1172,7 +1180,7 @@ impl Connection {
fn find_end_of_wal(&self, precise: bool) -> (XLogRecPtr, TimeLineID) {
find_end_of_wal(
&self.conf.data_dir,
self.system().get_info().server.wal_seg_size as usize,
self.timeline().get_info().server.wal_seg_size as usize,
precise,
)
}