Fixes, per Eric's and Konstantin's comments

This commit is contained in:
Heikki Linnakangas
2021-04-20 17:57:06 +03:00
parent f69db17409
commit d047a3abf7
11 changed files with 171 additions and 202 deletions

View File

@@ -73,8 +73,8 @@ impl ComputeControlPlane {
}
}
// Connect to a page server, get base backup, and untar it to initialize a
// new data directory
/// Connect to a page server, get base backup, and untar it to initialize a
/// new data directory
pub fn new_from_page_server(&mut self, is_test: bool, timelineid: ZTimelineId) -> Result<Arc<PostgresNode>> {
let node_id = self.nodes.len() as u32 + 1;
@@ -215,7 +215,7 @@ impl PostgresNode {
println!(
"Extracting base backup to create postgres instance: path={} port={}",
pgdata.to_str().unwrap(),
pgdata.display(),
self.address.port()
);
@@ -225,66 +225,64 @@ impl PostgresNode {
}
let sql = format!("basebackup {}", self.timelineid);
let mut client = self.pageserver.page_server_psql_client()?;
println!("connected to page server");
let mut client = self.pageserver.page_server_psql_client().with_context(|| "connecting to page erver failed")?;
fs::create_dir_all(&pgdata)?;
fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)).unwrap();
fs::create_dir_all(&pgdata)
.with_context(|| format!("could not create data directory {}", pgdata.display()))?;
fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700))
.with_context(|| format!("could not set permissions in data directory {}", pgdata.display()))?;
// Also create pg_wal directory, it's not included in the tarball
// FIXME: actually, it is currently.
// FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive.
// But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that
// we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here.
//fs::create_dir_all(pgdata.join("pg_wal"))?;
let mut copyreader = client.copy_out(sql.as_str())?;
let mut copyreader = client.copy_out(sql.as_str())
.with_context(|| "page server 'basebackup' command failed")?;
// FIXME: Currently, we slurp the whole tarball into memory, and then extract it,
// but we really should do this:
//let mut ar = tar::Archive::new(copyreader);
let mut buf = vec![];
copyreader.read_to_end(&mut buf)?;
println!("got tarball of size {}", buf.len());
copyreader.read_to_end(&mut buf)
.with_context(|| "reading base backup from page server failed")?;
let mut ar = tar::Archive::new(buf.as_slice());
ar.unpack(&pgdata)?;
ar.unpack(&pgdata)
.with_context(|| "extracting page backup failed")?;
// listen for selected port
self.append_conf(
"postgresql.conf",
format!(
&format!(
"max_wal_senders = 10\n\
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
max_connections = 100\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
address = self.address.ip(),
port = self.address.port()
)
.as_str(),
);
));
// Never clean up old WAL. TODO: We should use a replication
// slot or something proper, to prevent the compute node
// from removing WAL that hasn't been streamed to the safekeepr or
// page server yet. But this will do for now.
self.append_conf("postgresql.conf",
format!("wal_keep_size='10TB'\n")
.as_str(),
);
&format!("wal_keep_size='10TB'\n"));
// Connect it to the page server.
// Configure that node to take pages from pageserver
self.append_conf("postgresql.conf",
format!("page_server_connstring = 'host={} port={}'\n\
zenith_timeline='{}'\n",
self.pageserver.address().ip(),
self.pageserver.address().port(),
self.timelineid,
)
.as_str(),
);
&format!("page_server_connstring = 'host={} port={}'\n\
zenith_timeline='{}'\n",
self.pageserver.address().ip(),
self.pageserver.address().port(),
self.timelineid
));
Ok(())
}
@@ -317,6 +315,7 @@ impl PostgresNode {
fn pg_ctl(&self, args: &[&str]) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let pg_ctl = Command::new(pg_ctl_path)
.args(
[
@@ -332,13 +331,11 @@ impl PostgresNode {
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()?;
.status().with_context(|| "pg_ctl failed")?;
if !pg_ctl.success() {
anyhow::bail!("pg_ctl failed");
} else {
Ok(())
}
Ok(())
}
pub fn start(&self) -> Result<()> {
@@ -404,7 +401,7 @@ impl PostgresNode {
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_str()])
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])

View File

@@ -12,7 +12,6 @@ use bytes::Bytes;
use rand::Rng;
use anyhow::Context;
use hex;
use serde_derive::{Deserialize, Serialize};
use anyhow::Result;
@@ -53,10 +52,10 @@ impl LocalEnv {
}
}
fn zenith_repo_dir() -> String {
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
@@ -66,7 +65,7 @@ fn zenith_repo_dir() -> String {
//
pub fn init() -> Result<()> {
// check if config already exists
let repo_path = PathBuf::from(zenith_repo_dir());
let repo_path = zenith_repo_dir();
if repo_path.exists() {
anyhow::bail!("{} already exists. Perhaps already initialized?",
repo_path.to_str().unwrap());
@@ -113,19 +112,19 @@ pub fn init() -> Result<()> {
pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
{
let repopath = String::from(local_env.repo_path.to_str().unwrap());
fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath))?;
fs::create_dir(repopath.clone() + "/pgdatadirs")?;
fs::create_dir(repopath.clone() + "/timelines")?;
fs::create_dir(repopath.clone() + "/refs")?;
fs::create_dir(repopath.clone() + "/refs/branches")?;
fs::create_dir(repopath.clone() + "/refs/tags")?;
println!("created directory structure in {}", repopath);
let repopath = &local_env.repo_path;
fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath.display()))?;
fs::create_dir(repopath.join("pgdatadirs"))?;
fs::create_dir(repopath.join("timelines"))?;
fs::create_dir(repopath.join("refs"))?;
fs::create_dir(repopath.join("refs").join("branches"))?;
fs::create_dir(repopath.join("refs").join("tags"))?;
println!("created directory structure in {}", repopath.display());
// Create initial timeline
let tli = create_timeline(&local_env, None)?;
let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli));
println!("created initial timeline {}", timelinedir);
let timelinedir = repopath.join("timelines").join(tli.to_string());
println!("created initial timeline {}", timelinedir.display());
// Run initdb
//
@@ -151,7 +150,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.clone() + "/wal/000000010000000000000001.partial")?;
fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.join("wal").join("000000010000000000000001.partial"))?;
println!("moved initial WAL file");
// Remove pg_wal
@@ -161,13 +160,13 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
force_crash_recovery(&PathBuf::from("tmp"))?;
println!("updated pg_control");
let target = timelinedir.clone() + "/snapshots/" + &lsnstr;
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename("tmp", &target)?;
println!("moved 'tmp' to {}", &target);
println!("moved 'tmp' to {}", target.display());
// Create 'main' branch to refer to the initial timeline
let data = hex::encode(tli);
fs::write(repopath.clone() + "/refs/branches/main", data)?;
let data = tli.to_string();
fs::write(repopath.join("refs").join("branches").join("main"), data)?;
println!("created main branch");
// Also update the system id in the LocalEnv
@@ -175,9 +174,9 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
// write config
let toml = toml::to_string(&local_env)?;
fs::write(repopath.clone() + "/config", toml)?;
fs::write(repopath.join("config"), toml)?;
println!("new zenith repository was created in {}", &repopath);
println!("new zenith repository was created in {}", repopath.display());
Ok(())
}
@@ -195,9 +194,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()>
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let mut controlfilepath = datadir.to_path_buf();
controlfilepath.push("global");
controlfilepath.push("pg_control");
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile = postgres_ffi::decode_pg_control(
Bytes::from(fs::read(controlfilepath.as_path())?))?;
@@ -258,28 +255,29 @@ pub struct PointInTime {
pub lsn: u64
}
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<[u8; 16]> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
let repopath = &local_env.repo_path;
// Create initial timeline
let mut tli = [0u8; 16];
rand::thread_rng().fill(&mut tli);
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);
let timelinedir = format!("{}/timelines/{}", repopath, &hex::encode(tli));
let timelinedir = repopath.join("timelines").join(timelineid.to_string());
fs::create_dir(timelinedir.clone())?;
fs::create_dir(timelinedir.clone() + "/snapshots")?;
fs::create_dir(timelinedir.clone() + "/wal")?;
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{:X}/{:X}",
hex::encode(ancestor.timelineid.to_str()),
ancestor.timelineid,
ancestor.lsn >> 32,
ancestor.lsn & 0xffffffff);
fs::write(timelinedir + "/ancestor", data)?;
fs::write(timelinedir.join("ancestor"), data)?;
}
Ok(tli)
Ok(timelineid)
}
// Parse an LSN in the format used in filenames
@@ -292,26 +290,26 @@ fn parse_lsn(s: &str) -> std::result::Result<u64, std::num::ParseIntError> {
// Create a new branch in the repository (for the "zenith branch" subcommand)
pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointInTime) -> Result<()> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let repopath = &local_env.repo_path;
// create a new timeline for it
let newtli = create_timeline(local_env, Some(startpoint))?;
let newtimelinedir = format!("{}/timelines/{}", repopath, &hex::encode(newtli));
let newtimelinedir = repopath.join("timelines").join(newtli.to_string());
let data = hex::encode(newtli);
fs::write(format!("{}/refs/branches/{}", repopath, branchname), data)?;
let data = newtli.to_string();
fs::write(repopath.join("refs").join("branches").join(branchname), data)?;
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.clone() + "/snapshots", &copy_opts)?;
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;
let oldtimelinedir = format!("{}/timelines/{}", &repopath, startpoint.timelineid.to_str());
let oldtimelinedir = repopath.join("timelines").join(startpoint.timelineid.to_string());
let mut copy_opts = fs_extra::dir::CopyOptions::new();
copy_opts.content_only = true;
fs_extra::dir::copy(oldtimelinedir + "/wal/",
newtimelinedir.clone() + "/wal",
fs_extra::dir::copy(oldtimelinedir.join("wal"),
newtimelinedir.join("wal"),
&copy_opts)?;
Ok(())
@@ -319,8 +317,8 @@ pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointIn
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u64> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let waldir = PathBuf::from(format!("{}/timelines/{}/wal", repopath, timeline.to_str()));
let repopath = &local_env.repo_path;
let waldir = repopath.join("timelines").join(timeline.to_string()).join("wal");
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
@@ -329,15 +327,14 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<u6
// Find the latest snapshot for a timeline
fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(u64, PathBuf)> {
let repopath = String::from(local_env.repo_path.to_str().unwrap());
let repopath = &local_env.repo_path;
let timelinedir = repopath + "/timelines/" + &timeline.to_str();
let snapshotsdir = timelinedir.clone() + "/snapshots";
let paths = fs::read_dir(&snapshotsdir).unwrap();
let snapshotsdir = repopath.join("timelines").join(timeline.to_string()).join("snapshots");
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot: u64 = 0;
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path.unwrap();
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = parse_lsn(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
@@ -346,7 +343,7 @@ fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(
}
if maxsnapshot == 0 {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir);
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}
Ok((maxsnapshot, snapshotdir.unwrap()))

View File

@@ -4,6 +4,7 @@ use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

View File

@@ -1,4 +1,6 @@
use std::net::SocketAddr;
use std::str::FromStr;
use std::fmt;
pub mod page_cache;
pub mod page_service;
@@ -23,9 +25,10 @@ pub struct PageServerConf {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ZTimelineId([u8; 16]);
impl ZTimelineId {
impl FromStr for ZTimelineId {
type Err = hex::FromHexError;
pub fn from_str(s: &str) -> Result<ZTimelineId, hex::FromHexError> {
fn from_str(s: &str) -> Result<ZTimelineId, Self::Err> {
let timelineid = hex::decode(s)?;
let mut buf: [u8; 16] = [0u8; 16];
@@ -33,6 +36,9 @@ impl ZTimelineId {
Ok(ZTimelineId(buf))
}
}
impl ZTimelineId {
pub fn from(b: [u8; 16]) -> ZTimelineId {
ZTimelineId(b)
}
@@ -46,14 +52,11 @@ impl ZTimelineId {
pub fn as_arr(&self) -> [u8; 16] {
self.0
}
pub fn to_str(self: &ZTimelineId) -> String {
hex::encode(self.0)
}
}
impl std::fmt::Display for ZTimelineId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.to_str())
impl fmt::Display for ZTimelineId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}

View File

@@ -15,6 +15,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::io;
use std::thread;
use std::str::FromStr;
use std::sync::Arc;
use regex::Regex;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
@@ -247,14 +248,13 @@ impl FeDescribeMessage {
}
*/
if kind != 0x53 { // 'S'
if kind != b'S' {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"only prepared statmement Describe is implemented",
));
}
Ok(FeMessage::Describe(FeDescribeMessage {kind}))
}
}
@@ -262,7 +262,8 @@ impl FeDescribeMessage {
// we only support unnamed prepared stmt or portal
#[derive(Debug)]
struct FeExecuteMessage {
maxrows: i32// max # of rows
/// max # of rows
maxrows: i32
}
impl FeExecuteMessage {
@@ -469,7 +470,7 @@ impl Connection {
buffer: BytesMut::with_capacity(10 * 1024),
init_done: false,
conf,
runtime: runtime.clone(),
runtime: Arc::clone(runtime),
}
}

View File

@@ -45,19 +45,20 @@ const GLOBALTABLESPACE_OID: u32 = 1664;
//
pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId) -> Result<()> {
let timelinepath = PathBuf::from("timelines").join(&timeline.to_str());
let timelinepath = PathBuf::from("timelines").join(timeline.to_string());
if !timelinepath.exists() {
anyhow::bail!("timeline {} does not exist in the page server's repository");
}
// Scan .zenith/timelines/<timeline>/snapshots
let snapshotspath = "timelines/".to_owned() + &timeline.to_str() + "/snapshots";
let snapshotspath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots");
let mut last_snapshot_lsn: u64 = 0;
for direntry in fs::read_dir(&snapshotspath).unwrap() {
let filename = direntry.unwrap().file_name().to_str().unwrap().to_owned();
let direntry = direntry?;
let filename = direntry.file_name().to_str().unwrap().to_owned();
let lsn = u64::from_str_radix(&filename, 16)?;
last_snapshot_lsn = max(lsn, last_snapshot_lsn);
@@ -67,7 +68,7 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi
}
if last_snapshot_lsn == 0 {
error!("could not find valid snapshot in {}", &snapshotspath);
error!("could not find valid snapshot in {}", snapshotspath.display());
// TODO return error?
}
pcache.init_valid_lsn(last_snapshot_lsn);
@@ -98,54 +99,42 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re
fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, snapshot: &str) -> Result<()> {
let snapshotpath = "timelines/".to_owned() + &timeline.to_str() + "/snapshots/" + snapshot;
let snapshotpath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots").join(snapshot);
// Scan 'global'
let paths = fs::read_dir(snapshotpath.clone() + "/global").unwrap();
for direntry in fs::read_dir(snapshotpath.join("global"))? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
for direntry in paths {
let path = direntry.unwrap().path();
let filename = path.file_name();
if filename.is_none() {
continue;
}
let filename = filename.unwrap().to_str();
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => continue,
Some("pg_filenode.map") => continue,
if filename == Some("pg_control") {
continue;
// Load any relation files into the page server
_ => restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &direntry.path())?,
}
if filename == Some("pg_filenode.map") {
continue;
}
restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &path)?;
}
// Scan 'base'
let paths = fs::read_dir(snapshotpath.clone() + "/base").unwrap();
for path in paths {
let path = path.unwrap();
let filename = path.file_name().to_str().unwrap().to_owned();
// Scan 'base'. It contains database dirs, the database OID is the filename.
// E.g. 'base/12345', where 12345 is the database OID.
for direntry in fs::read_dir(snapshotpath.join("base"))? {
let direntry = direntry?;
// Scan database dirs
let dboid = u32::from_str_radix(&filename, 10)?;
let dboid = u32::from_str_radix(direntry.file_name().to_str().unwrap(), 10)?;
let paths = fs::read_dir(path.path()).unwrap();
for direntry in paths {
let path = direntry.unwrap().path();
let filename = path.file_name();
if filename.is_none() {
continue;
}
let filename = filename.unwrap().to_str();
if filename == Some("PG_VERSION") {
continue;
}
if filename == Some("pg_filenode.map") {
continue;
}
for direntry in fs::read_dir(direntry.path())? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &path)?;
// These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue,
Some("pg_filenode.map") => continue,
// Load any relation files into the page server
_ => restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &direntry.path())?,
}
}
}

View File

@@ -1,10 +1,7 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::cmp::min;
use std::error::Error;
use std::fmt;
use log::*;
use std::cmp::min;
use thiserror::Error;
const XLOG_BLCKSZ: u32 = 8192;
@@ -54,28 +51,11 @@ pub struct WalStreamDecoder {
}
#[derive(Debug, Clone)]
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
}
impl Error for WalDecodeError {
fn description(&self) -> &str {
&self.msg
}
}
impl WalDecodeError {
fn new(msg: &str) -> WalDecodeError {
WalDecodeError {
msg: msg.to_string(),
}
}
}
impl fmt::Display for WalDecodeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WAL decoding error: {}", self.msg)
}
lsn: u64
}
//
@@ -100,8 +80,14 @@ impl WalStreamDecoder {
self.inputbuf.extend_from_slice(buf);
}
// Returns a tuple:
// (end LSN, record)
/// Attempt to decode another WAL record from the input that has been fed to the
/// decoder so far.
///
/// Returns one of the following:
/// Ok((u64, Bytes)): a tuple containing the LSN of next record, and the record itself
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(u64, Bytes)>, WalDecodeError> {
loop {
// parse and verify page boundaries as we go
@@ -114,9 +100,7 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogLongPageHeaderData();
if hdr.std.xlp_pageaddr != self.lsn {
return Err(WalDecodeError::new(&format!("invalid xlog segment header at {:X}/{:X}",
self.lsn >> 32,
self.lsn & 0xffffffff)));
return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn });
}
// TODO: verify the remaining fields in the header
@@ -131,9 +115,7 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogPageHeaderData();
if hdr.xlp_pageaddr != self.lsn {
return Err(WalDecodeError::new(&format!("invalid xlog page header at {:X}/{:X}: {:?}",
self.lsn >> 32,
self.lsn & 0xffffffff, hdr)));
return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn });
}
// TODO: verify the remaining fields in the header
@@ -159,10 +141,7 @@ impl WalStreamDecoder {
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
if xl_tot_len < SizeOfXLogRecord {
return Err(WalDecodeError::new(&format!("invalid xl_tot_len {} at {:X}/{:X}",
xl_tot_len,
self.lsn >> 32,
self.lsn & 0xffffffff)));
return Err(WalDecodeError {msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn });
}
self.lsn += 4;

View File

@@ -65,7 +65,7 @@ fn main() -> Result<()> {
.get_matches();
let systemid_str = arg_matches.value_of("systemid").unwrap();
let systemid = u64::from_str_radix(systemid_str, 10)?;
let systemid: u64 = systemid_str.parse()?;
let mut conf = WalAcceptorConf {
data_dir: PathBuf::from("./"),

View File

@@ -3,6 +3,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use pageserver::ZTimelineId;
use std::io;
use std::str;
use std::str::FromStr;
pub type Oid = u32;
pub type SystemId = u64;

View File

@@ -601,9 +601,8 @@ impl Connection {
fn set_timeline(&mut self, timelineid: ZTimelineId) -> Result<()> {
let mut timelines = TIMELINES.lock().unwrap();
if !timelines.contains_key(&timelineid) {
let timeline_dir = timelineid.to_str();
info!("creating timeline dir {}", &timeline_dir);
fs::create_dir_all(&timeline_dir)?;
info!("creating timeline dir {}", timelineid);
fs::create_dir_all(timelineid.to_string())?;
timelines.insert(timelineid, Arc::new(Timeline::new(timelineid)));
}
self.timeline = Some(timelines.get(&timelineid).unwrap().clone());
@@ -1112,12 +1111,12 @@ impl Connection {
let wal_file_path = self
.conf
.data_dir
.join(self.timeline().timelineid.to_str())
.join(self.timeline().timelineid.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.data_dir
.join(self.timeline().timelineid.to_str())
.join(self.timeline().timelineid.to_string())
.join(wal_file_name.clone() + ".partial");
{

View File

@@ -1,10 +1,11 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::process::exit;
use std::str::FromStr;
use clap::{App, Arg, ArgMatches, SubCommand};
use anyhow::Result;
use anyhow::*;
use anyhow::{anyhow, bail};
use control_plane::{compute::ComputeControlPlane, local_env, storage};
use control_plane::local_env::LocalEnv;
@@ -12,10 +13,10 @@ use control_plane::storage::PageServerNode;
use pageserver::ZTimelineId;
fn zenith_repo_dir() -> String {
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => String::from(val.to_str().unwrap()),
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
@@ -239,19 +240,20 @@ fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
}
} else {
// No arguments, list branches
list_branches();
list_branches()?;
}
Ok(())
}
fn list_branches() {
fn list_branches() -> Result<()> {
// list branches
let paths = fs::read_dir(zenith_repo_dir() + "/refs/branches").unwrap();
let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?;
for path in paths {
let filename = path.unwrap().file_name().to_str().unwrap().to_owned();
println!(" {}", filename);
println!(" {}", path?.file_name().to_str().unwrap());
}
Ok(())
}
//
@@ -281,8 +283,8 @@ fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
let lsn: Option<u64>;
if let Some(lsnstr) = strings.next() {
let mut s = lsnstr.split("/");
let lsn_hi: u64 = s.next().unwrap().parse()?;
let lsn_lo: u64 = s.next().unwrap().parse()?;
let lsn_hi: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?;
let lsn_lo: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?;
lsn = Some(lsn_hi << 32 | lsn_lo);
}
else {
@@ -291,7 +293,7 @@ fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
// Check if it's a tag
if lsn.is_none() {
let tagpath:PathBuf = PathBuf::from(zenith_repo_dir() + "/refs/tags/" + name);
let tagpath = zenith_repo_dir().join("refs").join("tags").join(name);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
@@ -300,7 +302,7 @@ fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath:PathBuf = PathBuf::from(zenith_repo_dir() + "/refs/branches/" + name);
let branchpath = zenith_repo_dir().join("refs").join("branches").join(name);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
@@ -315,7 +317,7 @@ fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
// Check if it's a timelineid
// Check if it's timelineid @ LSN
let tlipath:PathBuf = PathBuf::from(zenith_repo_dir() + "/timelines/" + name);
let tlipath = zenith_repo_dir().join("timelines").join(name);
if tlipath.exists() {
let result = local_env::PointInTime {
timelineid: ZTimelineId::from_str(name)?,