cargo fmt

This commit is contained in:
Eric Seppanen
2021-04-20 17:59:56 -07:00
parent b5a5ea5831
commit 92e4f4b3b6
20 changed files with 480 additions and 300 deletions

View File

@@ -1,14 +1,17 @@
use log::*;
use tar::{Builder};
use regex::Regex;
use std::fmt;
use std::io::Write;
use tar::Builder;
use walkdir::WalkDir;
use regex::Regex;
use crate::ZTimelineId;
pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, snapshotlsn: u64) -> Result<(), std::io::Error> {
pub fn send_snapshot_tarball(
write: &mut dyn Write,
timelineid: ZTimelineId,
snapshotlsn: u64,
) -> Result<(), std::io::Error> {
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn);
@@ -27,12 +30,15 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna
}
if entry.file_type().is_dir() {
trace!("sending dir {} as {}", fullpath.display(), relpath.display());
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
@@ -61,7 +67,9 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna
}
let archive_fname = relpath.to_str().unwrap().clone();
let archive_fname = archive_fname.strip_suffix(".partial").unwrap_or(&archive_fname);
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
@@ -71,14 +79,12 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna
Ok(())
}
// formats:
// <oid>
// <oid>_<fork name>
// <oid>.<segment number>
// <oid>_<fork name>.<segment number>
#[derive(Debug)]
struct FilePathError {
msg: String,
@@ -145,7 +151,6 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
return Ok((relnode, forknum, segno));
}
fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
/*
* Relation data files can be in one of the following directories:

View File

@@ -4,11 +4,11 @@
use log::*;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::fs::{File, OpenOptions};
use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -32,27 +32,33 @@ fn zenith_repo_dir() -> String {
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.arg(Arg::with_name("listen")
.short("l")
.long("listen")
.takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"))
.arg(Arg::with_name("interactive")
.short("i")
.long("interactive")
.takes_value(false)
.help("Interactive mode"))
.arg(Arg::with_name("daemonize")
.short("d")
.long("daemonize")
.takes_value(false)
.help("Run in the background"))
.arg(
Arg::with_name("listen")
.short("l")
.long("listen")
.takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"),
)
.arg(
Arg::with_name("interactive")
.short("i")
.long("interactive")
.takes_value(false)
.help("Interactive mode"),
)
.arg(
Arg::with_name("daemonize")
.short("d")
.long("daemonize")
.takes_value(false)
.help("Run in the background"),
)
.get_matches();
let mut conf = PageServerConf {
daemonize: false,
interactive: false,
listen_addr: "127.0.0.1:5430".parse().unwrap()
listen_addr: "127.0.0.1:5430".parse().unwrap(),
};
if arg_matches.is_present("daemonize") {
@@ -128,9 +134,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
Ok(_) => info!("Success, daemonized"),
Err(e) => error!("Error, {}", e),
}
}
else
{
} else {
// change into the repository directory. In daemon mode, Daemonize
// does this for us.
let repodir = zenith_repo_dir();

View File

@@ -1,7 +1,8 @@
use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
use std::fmt;
pub mod basebackup;
pub mod page_cache;
pub mod page_service;
pub mod pg_constants;
@@ -12,7 +13,6 @@ mod tui_logger;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
pub mod basebackup;
#[derive(Debug, Clone)]
pub struct PageServerConf {
@@ -35,7 +35,6 @@ impl FromStr for ZTimelineId {
buf.copy_from_slice(timelineid.as_slice());
Ok(ZTimelineId(buf))
}
}
impl ZTimelineId {
@@ -55,8 +54,7 @@ impl ZTimelineId {
}
impl fmt::Display for ZTimelineId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
f.write_str(&hex::encode(self.0))
}
}

View File

@@ -6,9 +6,9 @@
// per-entry mutex.
//
use crate::{walredo, PageServerConf};
use crate::restore_local_repo::restore_timeline;
use crate::ZTimelineId;
use crate::{walredo, PageServerConf};
use anyhow::bail;
use bytes::Bytes;
use core::ops::Bound::Included;
@@ -109,7 +109,8 @@ struct PageCacheShared {
}
lazy_static! {
pub static ref PAGECACHES: Mutex<HashMap<ZTimelineId, Arc<PageCache>>> = Mutex::new(HashMap::new());
pub static ref PAGECACHES: Mutex<HashMap<ZTimelineId, Arc<PageCache>>> =
Mutex::new(HashMap::new());
}
// Get Page Cache for given timeline. It is assumed to already exist.
@@ -118,11 +119,14 @@ pub fn get_pagecache(_conf: &PageServerConf, timelineid: ZTimelineId) -> Option<
match pcaches.get(&timelineid) {
Some(pcache) => Some(pcache.clone()),
None => None
None => None,
}
}
pub fn get_or_restore_pagecache(conf: &PageServerConf, timelineid: ZTimelineId) -> anyhow::Result<Arc<PageCache>> {
pub fn get_or_restore_pagecache(
conf: &PageServerConf,
timelineid: ZTimelineId,
) -> anyhow::Result<Arc<PageCache>> {
let mut pcaches = PAGECACHES.lock().unwrap();
match pcaches.get(&timelineid) {
@@ -475,8 +479,11 @@ impl PageCache {
self.num_entries.fetch_add(1, Ordering::Relaxed);
if !oldentry.is_none() {
error!("overwriting WAL record with LSN {:X}/{:X} in page cache",
lsn >> 32, lsn & 0xffffffff);
error!(
"overwriting WAL record with LSN {:X}/{:X} in page cache",
lsn >> 32,
lsn & 0xffffffff
);
}
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
@@ -511,14 +518,18 @@ impl PageCache {
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
warn!("attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})",
oldlsn >> 32, oldlsn & 0xffffffff, lsn >> 32, lsn & 0xffffffff);
warn!(
"attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})",
oldlsn >> 32,
oldlsn & 0xffffffff,
lsn >> 32,
lsn & 0xffffffff
);
}
}

View File

@@ -13,26 +13,25 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use regex::Regex;
use std::io;
use std::thread;
use std::str::FromStr;
use std::sync::Arc;
use regex::Regex;
use std::thread;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::task;
use tokio::sync::mpsc;
use tokio::task;
use crate::basebackup;
use crate::page_cache;
use crate::restore_local_repo;
use crate::basebackup;
use crate::walreceiver;
use crate::PageServerConf;
use crate::ZTimelineId;
type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)]
@@ -172,8 +171,7 @@ struct FeParseMessage {
query_string: Bytes,
}
fn read_null_terminated(buf: &mut Bytes) -> Result<Bytes>
{
fn read_null_terminated(buf: &mut Bytes) -> Result<Bytes> {
let mut result = BytesMut::new();
loop {
@@ -221,15 +219,14 @@ impl FeParseMessage {
));
}
Ok(FeMessage::Parse(FeParseMessage {query_string}))
Ok(FeMessage::Parse(FeParseMessage { query_string }))
}
}
#[derive(Debug)]
struct FeDescribeMessage {
kind: u8, // 'S' to describe a prepared statement; or 'P' to describe a portal.
// we only support unnamed prepared stmt or portal
kind: u8, // 'S' to describe a prepared statement; or 'P' to describe a portal.
// we only support unnamed prepared stmt or portal
}
impl FeDescribeMessage {
@@ -255,7 +252,7 @@ impl FeDescribeMessage {
));
}
Ok(FeMessage::Describe(FeDescribeMessage {kind}))
Ok(FeMessage::Describe(FeDescribeMessage { kind }))
}
}
@@ -263,7 +260,7 @@ impl FeDescribeMessage {
#[derive(Debug)]
struct FeExecuteMessage {
/// max # of rows
maxrows: i32
maxrows: i32,
}
impl FeExecuteMessage {
@@ -286,14 +283,13 @@ impl FeExecuteMessage {
));
}
Ok(FeMessage::Execute(FeExecuteMessage {maxrows}))
Ok(FeMessage::Execute(FeExecuteMessage { maxrows }))
}
}
// we only support unnamed prepared stmt and portal
#[derive(Debug)]
struct FeBindMessage {
}
struct FeBindMessage {}
impl FeBindMessage {
pub fn parse(body: Bytes) -> Result<FeMessage> {
@@ -324,8 +320,7 @@ impl FeBindMessage {
// we only support unnamed prepared stmt and portal
#[derive(Debug)]
struct FeCloseMessage {
}
struct FeCloseMessage {}
impl FeCloseMessage {
pub fn parse(body: Bytes) -> Result<FeMessage> {
@@ -370,9 +365,7 @@ impl FeMessage {
let mut body = body.freeze();
match tag {
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage {
body: body,
}))),
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body: body }))),
b'P' => Ok(Some(FeParseMessage::parse(body)?)),
b'D' => Ok(Some(FeDescribeMessage::parse(body)?)),
b'E' => Ok(Some(FeExecuteMessage::parse(body)?)),
@@ -634,7 +627,6 @@ impl Connection {
}
async fn run(&mut self) -> Result<()> {
let mut unnamed_query_string = Bytes::new();
loop {
let msg = self.read_message().await?;
@@ -666,7 +658,8 @@ impl Connection {
self.write_message(&BeMessage::ParseComplete).await?;
}
Some(FeMessage::Describe(_)) => {
self.write_message_noflush(&BeMessage::ParameterDescription).await?;
self.write_message_noflush(&BeMessage::ParameterDescription)
.await?;
self.write_message(&BeMessage::NoData).await?;
}
Some(FeMessage::Bind(_)) => {
@@ -724,10 +717,13 @@ impl Connection {
// Check that the timeline exists
self.handle_basebackup_request(timelineid).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else if query_string.starts_with(b"callmemaybe ") {
let query_str = String::from_utf8(query_string.to_vec()).unwrap().to_string();
let query_str = String::from_utf8(query_string.to_vec())
.unwrap()
.to_string();
// callmemaybe <zenith timelineid as hex string> <connstr>
let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) (.*)$").unwrap();
@@ -777,7 +773,6 @@ impl Connection {
}
async fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> Result<()> {
// Check that the timeline exists
let pcache = page_cache::get_or_restore_pagecache(&self.conf, timelineid);
if pcache.is_err() {
@@ -954,7 +949,7 @@ impl Connection {
if joinres.is_err() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
joinres.unwrap_err()
joinres.unwrap_err(),
));
}
return joinres.unwrap();
@@ -1002,7 +997,6 @@ struct CopyDataSink(mpsc::Sender<Bytes>);
impl std::io::Write for CopyDataSink {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
let buf = Bytes::copy_from_slice(data);
if let Err(e) = self.0.blocking_send(buf) {

View File

@@ -14,6 +14,7 @@ use log::*;
use regex::Regex;
use std::fmt;
use std::cmp::max;
use std::error::Error;
use std::fs;
use std::fs::File;
@@ -21,19 +22,17 @@ use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::cmp::max;
use anyhow::Result;
use bytes::Bytes;
use crate::page_cache;
use crate::page_cache::PageCache;
use crate:: PageServerConf;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::waldecoder::WalStreamDecoder;
use crate::PageServerConf;
use crate::ZTimelineId;
// From pg_tablespace_d.h
//
// FIXME: we'll probably need these elsewhere too, move to some common location
@@ -43,8 +42,11 @@ const GLOBALTABLESPACE_OID: u32 = 1664;
//
// Load it all into the page cache.
//
pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId) -> Result<()> {
pub fn restore_timeline(
conf: &PageServerConf,
pcache: &PageCache,
timeline: ZTimelineId,
) -> Result<()> {
let timelinepath = PathBuf::from("timelines").join(timeline.to_string());
if !timelinepath.exists() {
@@ -52,7 +54,9 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi
}
// Scan .zenith/timelines/<timeline>/snapshots
let snapshotspath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots");
let snapshotspath = PathBuf::from("timelines")
.join(timeline.to_string())
.join("snapshots");
let mut last_snapshot_lsn: u64 = 0;
@@ -68,7 +72,10 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi
}
if last_snapshot_lsn == 0 {
error!("could not find valid snapshot in {}", snapshotspath.display());
error!(
"could not find valid snapshot in {}",
snapshotspath.display()
);
// TODO return error?
}
pcache.init_valid_lsn(last_snapshot_lsn);
@@ -79,7 +86,6 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi
}
pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Result<u64> {
let snapshotspath = format!("timelines/{}/snapshots", timeline);
let mut last_snapshot_lsn = 0;
@@ -97,9 +103,16 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re
Ok(last_snapshot_lsn)
}
fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, snapshot: &str) -> Result<()> {
let snapshotpath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots").join(snapshot);
fn restore_snapshot(
conf: &PageServerConf,
pcache: &PageCache,
timeline: ZTimelineId,
snapshot: &str,
) -> Result<()> {
let snapshotpath = PathBuf::from("timelines")
.join(timeline.to_string())
.join("snapshots")
.join(snapshot);
// Scan 'global'
for direntry in fs::read_dir(snapshotpath.join("global"))? {
@@ -112,7 +125,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli
Some("pg_filenode.map") => continue,
// Load any relation files into the page server
_ => restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &direntry.path())?,
_ => restore_relfile(
conf,
pcache,
timeline,
snapshot,
GLOBALTABLESPACE_OID,
0,
&direntry.path(),
)?,
}
}
@@ -133,7 +154,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli
Some("pg_filenode.map") => continue,
// Load any relation files into the page server
_ => restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &direntry.path())?,
_ => restore_relfile(
conf,
pcache,
timeline,
snapshot,
DEFAULTTABLESPACE_OID,
dboid,
&direntry.path(),
)?,
}
}
}
@@ -143,8 +172,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli
Ok(())
}
fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimelineId, snapshot: &str, spcoid: u32, dboid: u32, path: &Path) -> Result<()> {
fn restore_relfile(
_conf: &PageServerConf,
pcache: &PageCache,
_timeline: ZTimelineId,
snapshot: &str,
spcoid: u32,
dboid: u32,
path: &Path,
) -> Result<()> {
let lsn = u64::from_str_radix(snapshot, 16)?;
// Does it look like a relation file?
@@ -187,12 +223,12 @@ fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimel
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
break;
},
}
_ => {
error!("error reading file: {:?} ({})", path, e);
break;
}
}
},
};
blknum += 1;
}
@@ -210,7 +246,12 @@ fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimel
// Scan WAL on a timeline, starting from gien LSN, and load all the records
// into the page cache.
fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, startpoint: u64) -> Result<()> {
fn restore_wal(
_conf: &PageServerConf,
pcache: &PageCache,
timeline: ZTimelineId,
startpoint: u64,
) -> Result<()> {
let walpath = format!("timelines/{}/wal", timeline);
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
@@ -259,8 +300,7 @@ fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded =
crate::waldecoder::decode_wal_record(recdata.clone());
let decoded = crate::waldecoder::decode_wal_record(recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
@@ -299,7 +339,11 @@ fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId
segno += 1;
offset = 0;
}
info!("reached end of WAL at {:X}/{:X}", last_lsn >> 32, last_lsn & 0xffffffff);
info!(
"reached end of WAL at {:X}/{:X}",
last_lsn >> 32,
last_lsn & 0xffffffff
);
Ok(())
}
@@ -320,7 +364,6 @@ pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo {
return xlogptr / wal_segsz_bytes as u64;
}
#[allow(non_snake_case)]
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
return format!(
@@ -358,7 +401,6 @@ pub fn IsPartialXLogFileName(fname: &str) -> bool {
}
}
#[derive(Debug, Clone)]
struct FilePathError {
msg: String,
@@ -446,4 +488,3 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
return Ok((relnode, forknum, segno));
}

View File

@@ -50,12 +50,11 @@ pub struct WalStreamDecoder {
recordbuf: BytesMut,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: u64
lsn: u64,
}
//
@@ -100,7 +99,10 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogLongPageHeaderData();
if hdr.std.xlp_pageaddr != self.lsn {
return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn });
return Err(WalDecodeError {
msg: "invalid xlog segment header".into(),
lsn: self.lsn,
});
}
// TODO: verify the remaining fields in the header
@@ -115,7 +117,10 @@ impl WalStreamDecoder {
let hdr = self.decode_XLogPageHeaderData();
if hdr.xlp_pageaddr != self.lsn {
return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn });
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
lsn: self.lsn,
});
}
// TODO: verify the remaining fields in the header
@@ -141,7 +146,10 @@ impl WalStreamDecoder {
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
if xl_tot_len < SizeOfXLogRecord {
return Err(WalDecodeError {msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn });
return Err(WalDecodeError {
msg: format!("invalid xl_tot_len {}", xl_tot_len),
lsn: self.lsn,
});
}
self.lsn += 4;

View File

@@ -19,7 +19,7 @@ use postgres_types::PgLsn;
use std::collections::HashMap;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::{Write, Seek, SeekFrom};
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Mutex;
@@ -38,11 +38,16 @@ struct WalReceiverEntry {
}
lazy_static! {
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> = Mutex::new(HashMap::new());
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> =
Mutex::new(HashMap::new());
}
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str) {
pub fn launch_wal_receiver(
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
) {
let mut receivers = WAL_RECEIVERS.lock().unwrap();
match receivers.get_mut(&timelineid) {
@@ -50,7 +55,9 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {
let receiver = WalReceiverEntry { wal_producer_connstr: wal_producer_connstr.into() };
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
};
receivers.insert(timelineid, receiver);
// Also launch a new thread to handle this connection
@@ -59,7 +66,8 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p
.name("WAL receiver thread".into())
.spawn(move || {
thread_main(&conf_copy, timelineid);
}).unwrap();
})
.unwrap();
}
};
}
@@ -68,14 +76,21 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p
fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
let receivers = WAL_RECEIVERS.lock().unwrap();
receivers.get(&timelineid).unwrap().wal_producer_connstr.clone()
receivers
.get(&timelineid)
.unwrap()
.wal_producer_connstr
.clone()
}
//
// This is the entry point for the WAL receiver thread.
//
fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("WAL receiver thread started for timeline : '{}'", timelineid);
info!(
"WAL receiver thread started for timeline : '{}'",
timelineid
);
let runtime = runtime::Builder::new_current_thread()
.enable_all()
@@ -100,7 +115,11 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
});
}
async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str) -> Result<(), Error> {
async fn walreceiver_main(
conf: &PageServerConf,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
) -> Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
@@ -174,10 +193,12 @@ async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_pr
let startlsn = xlog_data.wal_start();
let endlsn = startlsn + data.len() as u64;
write_wal_file(startlsn,
timelineid,
16 * 1024 * 1024, // FIXME
data)?;
write_wal_file(
startlsn,
timelineid,
16 * 1024 * 1024, // FIXME
data,
)?;
trace!(
"received XLogData between {:X}/{:X} and {:X}/{:X}",
@@ -376,7 +397,6 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin
return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli);
}
fn write_wal_file(
startpos: XLogRecPtr,
timeline: ZTimelineId,
@@ -409,12 +429,13 @@ fn write_wal_file(
/* Open file */
let segno = XLByteToSeg(start_pos, wal_seg_size);
let wal_file_name = XLogFileName(1, // FIXME: always use Postgres timeline 1
segno, wal_seg_size);
let wal_file_path = wal_dir
.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir
.join(wal_file_name.clone() + ".partial");
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
segno,
wal_seg_size,
);
let wal_file_path = wal_dir.join(wal_file_name.clone());
let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial");
{
let mut wal_file: File;
@@ -422,8 +443,7 @@ fn write_wal_file(
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
{
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
/* Try to open existed partial file */
wal_file = file;
partial = true;

View File

@@ -19,10 +19,10 @@ use std::assert;
use std::cell::RefCell;
use std::fs;
use std::io::Error;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::process::Stdio;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
@@ -34,8 +34,8 @@ use bytes::{BufMut, Bytes, BytesMut};
use crate::page_cache;
use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord;
use crate::{page_cache::BufferTag, PageServerConf};
use crate::ZTimelineId;
use crate::{page_cache::BufferTag, PageServerConf};
static TIMEOUT: Duration = Duration::from_secs(20);