Cargo fmt pass over a codebase

This commit is contained in:
Stas Kelvich
2021-04-06 14:42:13 +03:00
parent 494b95886b
commit c0fcbbbe0c
23 changed files with 2501 additions and 2260 deletions

View File

@@ -1,13 +1,12 @@
use anyhow::Result;
use clap::{App, AppSettings};
use anyhow::{Result};
mod subcommand;
pub mod pg;
pub mod storage;
pub mod snapshot;
pub mod storage;
mod subcommand;
fn main() -> Result<()> {
let cli_commands = subcommand::ClapCommands {
commands: vec![
Box::new(pg::PgCmd {
@@ -22,7 +21,6 @@ fn main() -> Result<()> {
],
};
let matches = App::new("zenith")
.about("Zenith CLI")
.version("1.0")
@@ -30,7 +28,6 @@ fn main() -> Result<()> {
.subcommands(cli_commands.generate())
.get_matches();
if let Some(subcommand) = matches.subcommand_name() {
println!("'git {}' was used", subcommand);
}

View File

@@ -1,10 +1,8 @@
use anyhow::Result;
use clap::{App, AppSettings, Arg};
use anyhow::{Result};
use crate::subcommand;
pub struct PgCmd<'a> {
pub clap_cmd: clap::App<'a, 'a>,
}
@@ -13,81 +11,95 @@ impl subcommand::SubCommand for PgCmd<'_> {
fn gen_clap_command(&self) -> clap::App {
let c = self.clap_cmd.clone();
c.about("Operations with zenith compute nodes")
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(
App::new("list")
.about("List existing compute nodes")
)
.subcommand(
App::new("create")
.about("Create (init) new data directory using given storage and start postgres")
.arg(Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"))
.arg(Arg::with_name("storage")
.short("s")
.long("storage")
.takes_value(true)
.help("Name of the storage node to use"))
//TODO should it be just name of uploaded snapshot or some path?
.arg(Arg::with_name("snapshot")
.long("snapshot")
.takes_value(true)
.help("Name of the snapshot to use"))
.arg(Arg::with_name("nostart")
.long("no-start")
.takes_value(false)
.help("Don't start postgres on the created node"))
)
.subcommand(
App::new("destroy")
.about("Stop postgres and destroy node's data directory")
.arg(Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"))
)
.subcommand(
App::new("start")
.about("Start postgres on the given node")
.arg(Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"))
.arg(Arg::with_name("replica")
.long("replica")
.takes_value(false)
.help("Start the compute node as replica"))
)
.subcommand(
App::new("stop")
.about("Stop postgres on the given node")
.arg(Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"))
)
.subcommand(
App::new("show")
.about("Show info about the given node")
.arg(Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"))
)
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(App::new("list").about("List existing compute nodes"))
.subcommand(
App::new("create")
.about(
"Create (init) new data directory using given storage and start postgres",
)
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"),
)
.arg(
Arg::with_name("storage")
.short("s")
.long("storage")
.takes_value(true)
.help("Name of the storage node to use"),
)
//TODO should it be just name of uploaded snapshot or some path?
.arg(
Arg::with_name("snapshot")
.long("snapshot")
.takes_value(true)
.help("Name of the snapshot to use"),
)
.arg(
Arg::with_name("nostart")
.long("no-start")
.takes_value(false)
.help("Don't start postgres on the created node"),
),
)
.subcommand(
App::new("destroy")
.about("Stop postgres and destroy node's data directory")
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"),
),
)
.subcommand(
App::new("start")
.about("Start postgres on the given node")
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"),
)
.arg(
Arg::with_name("replica")
.long("replica")
.takes_value(false)
.help("Start the compute node as replica"),
),
)
.subcommand(
App::new("stop")
.about("Stop postgres on the given node")
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"),
),
)
.subcommand(
App::new("show")
.about("Show info about the given node")
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.takes_value(true)
.help("Name of the compute node"),
),
)
}
fn run(&self, args: clap::ArgMatches) -> Result<()> {
println!("Run PgCmd with args {:?}", args);
Ok(())
}
}
}

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use clap::{App, AppSettings, Arg};
use anyhow::{Result};
use crate::subcommand;
@@ -11,31 +11,17 @@ impl subcommand::SubCommand for SnapshotCmd<'_> {
fn gen_clap_command(&self) -> clap::App {
let c = self.clap_cmd.clone();
c.about("Operations with zenith snapshots")
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(
App::new("list")
)
.subcommand(
App::new("create")
.arg(Arg::with_name("pgdata").required(true)),
)
.subcommand(
App::new("destroy")
)
.subcommand(
App::new("start")
)
.subcommand(
App::new("stop")
)
.subcommand(
App::new("show")
)
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(App::new("list"))
.subcommand(App::new("create").arg(Arg::with_name("pgdata").required(true)))
.subcommand(App::new("destroy"))
.subcommand(App::new("start"))
.subcommand(App::new("stop"))
.subcommand(App::new("show"))
}
fn run(&self, args: clap::ArgMatches) -> Result<()> {
println!("Run SnapshotCmd with args {:?}", args);
Ok(())
}
}
}

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use clap::{App, AppSettings};
use anyhow::{Result};
use crate::subcommand;
@@ -11,24 +11,15 @@ impl subcommand::SubCommand for StorageCmd<'_> {
fn gen_clap_command(&self) -> clap::App {
let c = self.clap_cmd.clone();
c.about("Operations with zenith storage nodes")
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(
App::new("list")
)
.subcommand(
App::new("attach")
)
.subcommand(
App::new("detach")
)
.subcommand(
App::new("show")
)
.setting(AppSettings::SubcommandRequiredElseHelp)
.subcommand(App::new("list"))
.subcommand(App::new("attach"))
.subcommand(App::new("detach"))
.subcommand(App::new("show"))
}
fn run(&self, args: clap::ArgMatches) -> Result<()> {
println!("Run StorageCmd with args {:?}", args);
Ok(())
}
}
}

View File

@@ -3,19 +3,19 @@
//
use log::*;
use std::{fs::File, str::FromStr, fs::OpenOptions};
use std::fs;
use std::io;
use std::path::PathBuf;
use std::thread;
use std::fs;
use std::{fs::File, fs::OpenOptions, str::FromStr};
use clap::{App, Arg};
use daemonize::Daemonize;
use slog;
use slog_stdlog;
use slog_scope;
use slog::Drain;
use slog_scope;
use slog_stdlog;
use pageserver::page_service;
use pageserver::restore_s3;
@@ -129,8 +129,16 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// There should'n be any logging to stdin/stdout. Redirect it to the main log so
// that we will see any accidental manual fpritf's or backtraces.
let stdout = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("pageserver.log")).unwrap();
let stderr = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("pageserver.log")).unwrap();
let stdout = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("pageserver.log"))
.unwrap();
let stderr = OpenOptions::new()
.create(true)
.append(true)
.open(conf.data_dir.join("pageserver.log"))
.unwrap();
let daemonize = Daemonize::new()
.pid_file(conf.data_dir.join("pageserver.pid"))
@@ -157,13 +165,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// Create directory for wal-redo datadirs
match fs::create_dir(conf.data_dir.join("wal-redo")) {
Ok(_) => {},
Ok(_) => {}
Err(e) => match e.kind() {
io::ErrorKind::AlreadyExists => {}
_ => {
panic!("Failed to create wal-redo data directory: {}", e);
}
}
},
}
// Launch the WAL receiver thread if pageserver was started with --wal-producer

View File

@@ -4,12 +4,12 @@ use std::path::PathBuf;
pub mod page_cache;
pub mod page_service;
pub mod restore_s3;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
pub mod tui;
pub mod tui_event;
mod tui_logger;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
#[allow(dead_code)]
#[derive(Debug, Clone)]

View File

@@ -7,24 +7,24 @@
//
use core::ops::Bound::Included;
use std::{convert::TryInto, ops::AddAssign};
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::{Arc,Condvar, Mutex};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
// use tokio::sync::RwLock;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
use log::*;
use rand::Rng;
use crate::{PageServerConf, walredo};
use crate::{walredo, PageServerConf};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Sender, Receiver};
use crossbeam_channel::{Receiver, Sender};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -63,7 +63,6 @@ pub struct PageCacheStats {
}
impl AddAssign for PageCacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
@@ -81,7 +80,6 @@ impl AddAssign for PageCacheStats {
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// The actual page cache
pagecache: BTreeMap<CacheKey, Arc<CacheEntry>>,
@@ -112,10 +110,10 @@ struct PageCacheShared {
}
lazy_static! {
pub static ref PAGECACHES : Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
pub static ref PAGECACHES: Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
}
pub fn get_pagecahe(conf: PageServerConf, sys_id : u64) -> Arc<PageCache> {
pub fn get_pagecahe(conf: PageServerConf, sys_id: u64) -> Arc<PageCache> {
let mut pcaches = PAGECACHES.lock().unwrap();
if !pcaches.contains_key(&sys_id) {
@@ -137,20 +135,18 @@ pub fn get_pagecahe(conf: PageServerConf, sys_id : u64) -> Arc<PageCache> {
pcaches.get(&sys_id).unwrap().clone()
}
fn init_page_cache() -> PageCache
{
fn init_page_cache() -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();
PageCache {
shared: Mutex::new(
PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
}),
shared: Mutex::new(PageCacheShared {
pagecache: BTreeMap::new(),
relsize_cache: HashMap::new(),
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
}),
valid_lsn_condvar: Condvar::new(),
walredo_sender: s,
@@ -165,10 +161,8 @@ fn init_page_cache() -> PageCache
last_valid_lsn: AtomicU64::new(0),
last_record_lsn: AtomicU64::new(0),
}
}
//
// We store two kinds of entries in the page cache:
//
@@ -185,7 +179,7 @@ fn init_page_cache() -> PageCache
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64
pub lsn: u64,
}
pub struct CacheEntry {
@@ -198,7 +192,7 @@ pub struct CacheEntry {
//
// FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar
// or something else.
pub walredo_condvar: Condvar
pub walredo_condvar: Condvar,
}
pub struct CacheEntryContent {
@@ -221,7 +215,6 @@ impl CacheEntry {
}
}
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
@@ -241,415 +234,409 @@ pub struct BufferTag {
#[derive(Clone)]
pub struct WALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub lsn: u64, // LSN at the *end* of the record
pub will_init: bool,
pub rec: Bytes
pub rec: Bytes,
}
// Public interface functions
impl PageCache {
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>>
{
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag: tag, lsn: 0 };
let maxkey = CacheKey { tag: tag, lsn: lsn };
let entry_rc: Arc<CacheEntry>;
{
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
let wait_result = self.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
shared = wait_result.0;
if wait_result.1.timed_out() {
return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?;
}
}
if waited {
trace!("caught up now, continuing");
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
}
// Lock the cache entry and dig the page image out of it.
let page_img: Bytes;
{
let mut entry_content = entry_rc.content.lock().unwrap();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
if !entry_content.apply_pending {
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
}
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
return Err("could not apply WAL to reconstruct page image".into());
}
};
} else {
// No base image, and no WAL record. Huh?
return Err(format!("no page image or WAL record for requested page"))?;
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!("Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo,
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum);
return Ok(page_img);
}
//
// Collect all the WAL records that are needed to reconstruct a page
// image for the given cache entry.
//
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>)
{
// Scan the BTreeMap backwards, starting from the given entry.
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
let minkey = CacheKey {
tag: entry.key.tag,
lsn: 0
};
let maxkey = CacheKey {
tag: entry.key.tag,
lsn: entry.key.lsn
};
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
// the last entry in the range should be the CacheEntry we were given
//let _last_entry = entries.next_back();
//assert!(last_entry == entry);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_key, e) in entries.rev() {
let e = e.content.lock().unwrap();
if let Some(img) = &e.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &e.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
}
records.reverse();
return (base_img, records);
}
//
// Adds a WAL record to the page cache
//
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord)
{
let key = CacheKey {
tag: tag,
lsn: rec.lsn
};
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
dbnode: tag.dbnode,
relnode: tag.relnode,
forknum: tag.forknum,
};
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
if tag.blknum >= *rel_entry {
*rel_entry = tag.blknum + 1;
}
trace!("put_wal_record lsn: {}", key.lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
if !oldentry.is_none() {
error!("overwriting WAL record in page cache");
}
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Memorize a full image of a page version
//
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes)
{
let key = CacheKey {
tag: tag,
lsn: lsn
};
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
//
pub fn advance_last_valid_lsn(&self, lsn: u64)
{
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
// NOTE: this updates last_valid_lsn as well.
//
pub fn advance_last_record_lsn(&self, lsn: u64)
{
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
assert!(lsn >= shared.last_record_lsn);
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
pub fn _advance_first_valid_lsn(&self, lsn: u64)
{
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
}
pub fn init_valid_lsn(&self, lsn: u64)
{
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
assert!(shared.last_record_lsn == 0);
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
pub fn get_last_valid_lsn(&self) -> u64
{
let shared = self.shared.lock().unwrap();
return shared.last_record_lsn;
}
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn _test_get_page_at_lsn(&self)
{
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
// GetPage@LSN
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let mut tag: Option<BufferTag> = None;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag: tag, lsn: 0 };
let maxkey = CacheKey { tag: tag, lsn: lsn };
{
let entry_rc: Arc<CacheEntry>;
{
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!(
"not caught up yet: {}, requested {}",
shared.last_valid_lsn,
lsn
);
let wait_result = self
.valid_lsn_condvar
.wait_timeout(shared, TIMEOUT)
.unwrap();
shared = wait_result.0;
if wait_result.1.timed_out() {
return Err(format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
))?;
}
}
if waited {
trace!("caught up now, continuing");
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
}
// Lock the cache entry and dig the page image out of it.
let page_img: Bytes;
{
let mut entry_content = entry_rc.content.lock().unwrap();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
if !entry_content.apply_pending {
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
}
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
return Err("could not apply WAL to reconstruct page image".into());
}
};
} else {
// No base image, and no WAL record. Huh?
return Err(format!("no page image or WAL record for requested page"))?;
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.spcnode,
tag.dbnode,
tag.relnode,
tag.forknum,
tag.blknum
);
return Ok(page_img);
}
//
// Collect all the WAL records that are needed to reconstruct a page
// image for the given cache entry.
//
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
// Scan the BTreeMap backwards, starting from the given entry.
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
info!("page cache is empty");
return;
}
let minkey = CacheKey {
tag: entry.key.tag,
lsn: 0,
};
let maxkey = CacheKey {
tag: entry.key.tag,
lsn: entry.key.lsn,
};
let entries = pagecache.range((Included(&minkey), Included(&maxkey)));
// Find nth entry in the map, where n is picked at random
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
// the last entry in the range should be the CacheEntry we were given
//let _last_entry = entries.next_back();
//assert!(last_entry == entry);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_key, e) in entries.rev() {
let e = e.content.lock().unwrap();
if let Some(img) = &e.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &e.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
}
records.reverse();
return (base_img, records);
}
//
// Adds a WAL record to the page cache
//
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let key = CacheKey {
tag: tag,
lsn: rec.lsn,
};
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
dbnode: tag.dbnode,
relnode: tag.relnode,
forknum: tag.forknum,
};
let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0);
if tag.blknum >= *rel_entry {
*rel_entry = tag.blknum + 1;
}
trace!("put_wal_record lsn: {}", key.lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
if !oldentry.is_none() {
error!("overwriting WAL record in page cache");
}
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Memorize a full image of a page version
//
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
let key = CacheKey { tag: tag, lsn: lsn };
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry));
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
//
pub fn advance_last_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
// NOTE: this updates last_valid_lsn as well.
//
pub fn advance_last_record_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
assert!(lsn >= shared.last_record_lsn);
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.valid_lsn_condvar.notify_all();
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
pub fn _advance_first_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
}
pub fn init_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
assert!(shared.last_record_lsn == 0);
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
pub fn get_last_valid_lsn(&self) -> u64 {
let shared = self.shared.lock().unwrap();
return shared.last_record_lsn;
}
//
// Simple test function for the WAL redo code:
//
// 1. Pick a page from the page cache at random.
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn _test_get_page_at_lsn(&self) {
// for quick testing of the get_page_at_lsn() funcion.
//
// Get a random page from the page cache. Apply all its WAL, by requesting
// that page at the highest lsn.
let mut tag: Option<BufferTag> = None;
{
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
info!("page cache is empty");
return;
}
// Find nth entry in the map, where n is picked at random
let n = rand::thread_rng().gen_range(0..pagecache.len());
let mut i = 0;
for (key, _e) in pagecache.iter() {
if i == n {
tag = Some(key.tag);
break;
}
i += 1;
}
}
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
}
Err(error) => {
error!("GetPage@LSN failed: {}", error);
}
i += 1;
}
}
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
},
Err(error) => {
error!("GetPage@LSN failed: {}", error);
// FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN.
pub fn relsize_inc(&self, rel: &RelTag, to: Option<u32>) {
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to {
if to >= *entry {
*entry = to + 1;
}
}
}
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(&self, rel: &RelTag) -> bool {
let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}
pub fn get_stats(&self) -> PageCacheStats {
PageCacheStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
}
// FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN.
pub fn relsize_inc(&self, rel: &RelTag, to: Option<u32>)
{
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to {
if to >= *entry {
*entry = to + 1;
}
}
}
pub fn relsize_get(&self, rel: &RelTag) -> u32
{
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(&self, rel: &RelTag) -> bool
{
let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}
pub fn get_stats(&self) -> PageCacheStats
{
PageCacheStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
}
pub fn get_stats() -> PageCacheStats
{
pub fn get_stats() -> PageCacheStats {
let pcaches = PAGECACHES.lock().unwrap();
let mut stats = PageCacheStats {

View File

@@ -10,19 +10,19 @@
// *callmemaybe $url* -- ask pageserver to start walreceiver on $url
//
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::runtime;
use tokio::task;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, Bytes, BytesMut};
use std::{io};
use std::thread;
use log::*;
use std::io;
use std::thread;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime;
use tokio::task;
use crate::page_cache;
use crate::PageServerConf;
use crate::walreceiver;
use crate::PageServerConf;
type Result<T> = std::result::Result<T, io::Error>;
@@ -81,7 +81,7 @@ struct ZenithStatusResponse {
struct ZenithReadResponse {
ok: bool,
n_blocks: u32,
page: Bytes
page: Bytes,
}
#[derive(Debug)]
@@ -95,15 +95,15 @@ enum StartupRequestCode {
Cancel,
NegotiateSsl,
NegotiateGss,
Normal
Normal,
}
impl FeStartupMessage {
pub fn parse(buf: &mut BytesMut) -> Result<Option<FeMessage>> {
const MAX_STARTUP_PACKET_LENGTH: u32 = 10000;
const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678;
const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679;
const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680;
const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679;
const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680;
if buf.len() < 4 {
return Ok(None);
@@ -123,11 +123,14 @@ impl FeStartupMessage {
CANCEL_REQUEST_CODE => StartupRequestCode::Cancel,
NEGOTIATE_SSL_CODE => StartupRequestCode::NegotiateSsl,
NEGOTIATE_GSS_CODE => StartupRequestCode::NegotiateGss,
_ => StartupRequestCode::Normal
_ => StartupRequestCode::Normal,
};
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage{version, kind})))
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
version,
kind,
})))
}
}
@@ -139,7 +142,7 @@ struct Buffer {
#[derive(Debug)]
struct FeQueryMessage {
body: Bytes
body: Bytes,
}
impl FeMessage {
@@ -171,7 +174,9 @@ impl FeMessage {
body.advance(5);
match tag {
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage{body:body.freeze()}))),
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage {
body: body.freeze(),
}))),
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => {
let smgr_tag = body.get_u8();
@@ -197,15 +202,13 @@ impl FeMessage {
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
))
)),
}
},
tag => {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag: {},'{:?}'", tag, buf),
))
}
tag => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag: {},'{:?}'", tag, buf),
)),
}
}
}
@@ -213,13 +216,15 @@ impl FeMessage {
///////////////////////////////////////////////////////////////////////////////
pub fn thread_main(conf: PageServerConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
info!("Starting page server on {}", conf.listen_addr);
@@ -249,13 +254,12 @@ struct Connection {
}
impl Connection {
pub fn new(conf: PageServerConf, socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(10 * 1024),
init_done: false,
conf: conf
conf: conf,
}
}
@@ -272,24 +276,24 @@ impl Connection {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(io::Error::new(io::ErrorKind::Other,"connection reset by peer"));
return Err(io::Error::new(
io::ErrorKind::Other,
"connection reset by peer",
));
}
}
}
}
fn parse_message(&mut self) -> Result<Option<FeMessage>> {
if !self.init_done {
FeStartupMessage::parse(&mut self.buffer)
} else {
FeMessage::parse(&mut self.buffer)
}
}
async fn write_message_noflush(&mut self, message: &BeMessage) -> io::Result<()> {
match message {
BeMessage::AuthenticationOk => {
self.stream.write_u8(b'R').await?;
@@ -308,16 +312,18 @@ impl Connection {
let mut b = Bytes::from("data\0");
self.stream.write_u8(b'T').await?;
self.stream.write_i32(4 + 2 + b.len() as i32 + 3*(4 + 2)).await?;
self.stream
.write_i32(4 + 2 + b.len() as i32 + 3 * (4 + 2))
.await?;
self.stream.write_i16(1).await?;
self.stream.write_buf(&mut b).await?;
self.stream.write_i32(0).await?; /* table oid */
self.stream.write_i16(0).await?; /* attnum */
self.stream.write_i32(25).await?; /* TEXTOID */
self.stream.write_i16(-1).await?; /* typlen */
self.stream.write_i32(0).await?; /* typmod */
self.stream.write_i16(0).await?; /* format code */
self.stream.write_i32(0).await?; /* table oid */
self.stream.write_i16(0).await?; /* attnum */
self.stream.write_i32(25).await?; /* TEXTOID */
self.stream.write_i16(-1).await?; /* typlen */
self.stream.write_i32(0).await?; /* typmod */
self.stream.write_i16(0).await?; /* format code */
}
// XXX: accept some text data
@@ -371,7 +377,9 @@ impl Connection {
BeMessage::ZenithReadResponse(resp) => {
self.stream.write_u8(b'd').await?;
self.stream.write_u32(4 + 1 + 1 + 4 + resp.page.len() as u32).await?;
self.stream
.write_u32(4 + 1 + 1 + 4 + resp.page.len() as u32)
.await?;
self.stream.write_u8(102).await?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8).await?;
self.stream.write_u32(resp.n_blocks).await?;
@@ -388,9 +396,7 @@ impl Connection {
}
async fn run(&mut self) -> Result<()> {
loop {
match self.read_message().await? {
Some(FeMessage::StartupMessage(m)) => {
trace!("got message {:?}", m);
@@ -402,16 +408,17 @@ impl Connection {
self.stream.flush().await?;
}
StartupRequestCode::Normal => {
self.write_message_noflush(&BeMessage::AuthenticationOk).await?;
self.write_message_noflush(&BeMessage::AuthenticationOk)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await?;
self.init_done = true;
},
StartupRequestCode::Cancel => return Ok(())
}
StartupRequestCode::Cancel => return Ok(()),
}
},
}
Some(FeMessage::Query(m)) => {
self.process_query(&m).await?;
},
}
Some(FeMessage::Terminate) => {
break;
}
@@ -420,7 +427,7 @@ impl Connection {
break;
}
_ => {
return Err(io::Error::new(io::ErrorKind::Other,"unexpected message"));
return Err(io::Error::new(io::ErrorKind::Other, "unexpected message"));
}
}
}
@@ -428,25 +435,21 @@ impl Connection {
Ok(())
}
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> {
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<()> {
trace!("got query {:?}", q.body);
if q.body.starts_with(b"controlfile") {
self.handle_controlfile().await
} else if q.body.starts_with(b"pagestream ") {
let (_l,r) = q.body.split_at("pagestream ".len());
let (_l, r) = q.body.split_at("pagestream ".len());
let mut r = r.to_vec();
r.pop();
let sysid = String::from_utf8(r).unwrap().trim().to_string();
let sysid: u64 = sysid.parse().unwrap(); // XXX
self.handle_pagerequests(sysid).await
} else if q.body.starts_with(b"callmemaybe ") {
let (_l,r) = q.body.split_at("callmemaybe ".len());
let (_l, r) = q.body.split_at("callmemaybe ".len());
let mut r = r.to_vec();
r.pop();
let connstr = String::from_utf8(r).unwrap().trim().to_string();
@@ -455,44 +458,49 @@ impl Connection {
let _walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf_copy,&connstr);
walreceiver::thread_main(conf_copy, &connstr);
})
.unwrap();
// generick ack:
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::RowDescription)
.await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else if q.body.starts_with(b"status") {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::RowDescription)
.await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::RowDescription)
.await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
}
}
async fn handle_controlfile(&mut self) -> Result<()> {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::RowDescription)
.await?;
self.write_message_noflush(&BeMessage::ControlFile).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
}
async fn handle_pagerequests(&mut self, sysid: u64) -> Result<()> {
/* switch client to COPYBOTH */
self.stream.write_u8(b'W').await?;
self.stream.write_i32(4 + 1 + 2).await?;
self.stream.write_u8(0).await?; /* copy_is_binary */
self.stream.write_i16(0).await?; /* numAttributes */
self.stream.write_u8(0).await?; /* copy_is_binary */
self.stream.write_i16(0).await?; /* numAttributes */
self.stream.flush().await?;
let pcache = page_cache::get_pagecahe(self.conf.clone(), sysid);
@@ -511,7 +519,6 @@ impl Connection {
match message {
Some(FeMessage::ZenithExistsRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -523,23 +530,25 @@ impl Connection {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: exist,
n_blocks: 0
})).await?
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithTruncRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithUnlinkRequest(_)) => {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithNblocksRequest(req)) => {
let tag = page_cache::RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -551,8 +560,9 @@ impl Connection {
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks: n_blocks
})).await?
n_blocks: n_blocks,
}))
.await?
}
Some(FeMessage::ZenithReadRequest(req)) => {
let buf_tag = page_cache::BufferTag {
@@ -560,30 +570,27 @@ impl Connection {
dbnode: req.dbnode,
relnode: req.relnode,
forknum: req.forknum,
blknum: req.blkno
blknum: req.blkno,
};
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => {
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: p
})
},
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: p,
}),
Err(e) => {
const ZERO_PAGE:[u8; 8192] = [0; 8192];
const ZERO_PAGE: [u8; 8192] = [0; 8192];
error!("get_page_at_lsn: {}", e);
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: false,
n_blocks: 0,
page: Bytes::from_static(&ZERO_PAGE)
page: Bytes::from_static(&ZERO_PAGE),
})
}
};
self.write_message(&msg).await?
}
Some(FeMessage::ZenithCreateRequest(req)) => {
let tag = page_cache::RelTag {
@@ -597,8 +604,9 @@ impl Connection {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
n_blocks: 0,
}))
.await?
}
Some(FeMessage::ZenithExtendRequest(req)) => {
let tag = page_cache::RelTag {
@@ -612,14 +620,12 @@ impl Connection {
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
n_blocks: 0
})).await?
}
_ => {
n_blocks: 0,
}))
.await?
}
_ => {}
}
}
}
}

View File

@@ -7,11 +7,11 @@
// is started, it starts streaming from that LSN.
//
use bytes::{Buf, BytesMut};
use log::*;
use regex::Regex;
use std::env;
use std::fmt;
use regex::Regex;
use bytes::{BytesMut, Buf};
use log::*;
use s3::bucket::Bucket;
use s3::creds::Credentials;
@@ -22,12 +22,12 @@ use tokio::runtime;
use futures::future;
use crate::{PageServerConf, page_cache};
use crate::{page_cache, PageServerConf};
struct Storage {
region: Region,
credentials: Credentials,
bucket: String
bucket: String,
}
pub fn restore_main(conf: &PageServerConf) {
@@ -38,7 +38,9 @@ pub fn restore_main(conf: &PageServerConf) {
let result = restore_chunk(conf).await;
match result {
Ok(_) => { return; },
Ok(_) => {
return;
}
Err(err) => {
error!("S3 error: {}", err);
return;
@@ -56,7 +58,6 @@ pub fn restore_main(conf: &PageServerConf) {
// Load it all into the page cache.
//
async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
let backend = Storage {
region: Region::Custom {
region: env::var("S3_REGION").unwrap().into(),
@@ -67,8 +68,10 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
Some(&env::var("S3_SECRET").unwrap()),
None,
None,
None).unwrap(),
bucket: "zenith-testbucket".to_string()
None,
)
.unwrap(),
bucket: "zenith-testbucket".to_string(),
};
info!("Restoring from S3...");
@@ -77,7 +80,9 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
let bucket = Bucket::new_with_path_style(&backend.bucket, backend.region, backend.credentials)?;
// List out contents of directory
let results: Vec<s3::serde_types::ListBucketResult> = bucket.list("relationdata/".to_string(), Some("".to_string())).await?;
let results: Vec<s3::serde_types::ListBucketResult> = bucket
.list("relationdata/".to_string(), Some("".to_string()))
.await?;
// TODO: get that from backup
let sys_id: u64 = 42;
@@ -86,7 +91,6 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
for result in results {
for object in result.contents {
// Download every relation file, slurping them into memory
let key = object.key;
@@ -104,7 +108,9 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
slurp_futures.push(f);
}
Err(e) => { warn!("unrecognized file: {} ({})", relpath, e); }
Err(e) => {
warn!("unrecognized file: {} ({})", relpath, e);
}
};
}
}
@@ -127,29 +133,28 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
// From pg_tablespace_d.h
//
// FIXME: we'll probably need these elsewhere too, move to some common location
const DEFAULTTABLESPACE_OID:u32 = 1663;
const GLOBALTABLESPACE_OID:u32 = 1664;
const DEFAULTTABLESPACE_OID: u32 = 1663;
const GLOBALTABLESPACE_OID: u32 = 1664;
#[derive(Debug)]
struct FilePathError {
msg: String
msg: String,
}
impl FilePathError {
fn new(msg: &str) -> FilePathError {
FilePathError {
msg: msg.to_string()
msg: msg.to_string(),
}
}
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(e: core::num::ParseIntError) -> Self {
return FilePathError { msg: format!("invalid filename: {}", e) }
return FilePathError {
msg: format!("invalid filename: {}", e),
};
}
}
impl fmt::Display for FilePathError {
@@ -158,7 +163,6 @@ impl fmt::Display for FilePathError {
}
}
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
@@ -166,7 +170,7 @@ fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
Some("fsm") => Ok(1),
Some("vm") => Ok(2),
Some("init") => Ok(3),
Some(_) => Err(FilePathError::new("invalid forkname"))
Some(_) => Err(FilePathError::new("invalid forkname")),
}
}
@@ -188,20 +192,29 @@ struct ParsedBaseImageFileName {
// <oid>_<fork name>.<segment number>
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?_(?P<lsnhi>[[:xdigit:]]{8})(?P<lsnlo>[[:xdigit:]]{8})$").unwrap();
let caps = re.captures(fname).ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let caps = re
.captures(fname)
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = u32::from_str_radix(relnode_str, 10)?;
let forkname_match = caps.name("forkname");
let forkname = if forkname_match.is_none() { None } else { Some(forkname_match.unwrap().as_str()) };
let forkname = if forkname_match.is_none() {
None
} else {
Some(forkname_match.unwrap().as_str())
};
let forknum = forkname_to_forknum(forkname)?;
let segno_match = caps.name("segno");
let segno = if segno_match.is_none() { 0 } else { u32::from_str_radix(segno_match.unwrap().as_str(), 10)? };
let segno = if segno_match.is_none() {
0
} else {
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
let lsn_hi = u64::from_str_radix(caps.name("lsnhi").unwrap().as_str(), 16)?;
let lsn_lo = u64::from_str_radix(caps.name("lsnlo").unwrap().as_str(), 16)?;
@@ -211,7 +224,6 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
}
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
/*
* Relation data files can be in one of the following directories:
*
@@ -238,15 +250,20 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
relnode,
forknum,
segno,
lsn
lsn,
});
} else if let Some(dbpath) = path.strip_prefix("base/") {
let mut s = dbpath.split("/");
let dbnode_str = s.next().ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let dbnode_str = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let dbnode = u32::from_str_radix(dbnode_str, 10)?;
let fname = s.next().ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
if s.next().is_some() { return Err(FilePathError::new("invalid relation data file name")); };
let fname = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
if s.next().is_some() {
return Err(FilePathError::new("invalid relation data file name"));
};
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
@@ -256,9 +273,8 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
relnode,
forknum,
segno,
lsn
lsn,
});
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
// TODO
return Err(FilePathError::new("tablespaces not supported"));
@@ -270,8 +286,13 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
//
// Load a base file from S3, and insert it into the page cache
//
async fn slurp_base_file(conf: &PageServerConf, sys_id: u64, bucket: Bucket, s3path: String, parsed: ParsedBaseImageFileName)
{
async fn slurp_base_file(
conf: &PageServerConf,
sys_id: u64,
bucket: Bucket,
s3path: String,
parsed: ParsedBaseImageFileName,
) {
// FIXME: rust-s3 opens a new connection for each request. Should reuse
// the reqwest::Client object. But that requires changes to rust-s3 itself.
let (data, code) = bucket.get_object(s3path.clone()).await.unwrap();
@@ -282,18 +303,17 @@ async fn slurp_base_file(conf: &PageServerConf, sys_id: u64, bucket: Bucket, s3p
let mut bytes = BytesMut::from(data.as_slice()).freeze();
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024*1024*1024 / 8192);
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: blknum
blknum: blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));

View File

@@ -2,17 +2,17 @@ use crate::tui_event::{Event, Events};
use crate::tui_logger::TuiLogger;
use crate::tui_logger::TuiLoggerWidget;
use std::{error::Error, io};
use lazy_static::lazy_static;
use std::sync::Arc;
use std::{error::Error, io};
use termion::{event::Key, input::MouseTerminal, raw::IntoRawMode, screen::AlternateScreen};
use tui::backend::TermionBackend;
use tui::buffer::Buffer;
use tui::style::{Color, Style, Modifier};
use tui::layout::{Constraint, Direction, Layout, Rect};
use tui::style::{Color, Modifier, Style};
use tui::text::{Span, Spans, Text};
use tui::widgets::{Block, BorderType, Borders, Paragraph, Widget};
use tui::Terminal;
use tui::text::{Text, Span, Spans};
use tui::widgets::{Widget, Block, Borders, BorderType, Paragraph};
use tui::layout::{Layout, Direction, Constraint, Rect};
use lazy_static::lazy_static;
use slog;
use slog::Drain;
@@ -25,64 +25,69 @@ lazy_static! {
}
pub fn init_logging() -> slog_scope::GlobalLoggerGuard {
let pageservice_drain = slog::Filter::new(PAGESERVICE_DRAIN.as_ref(),
|record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug) && record.module().starts_with("pageserver::page_service") {
let pageservice_drain =
slog::Filter::new(PAGESERVICE_DRAIN.as_ref(), |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver::page_service")
{
return true;
}
return false;
}
).fuse();
})
.fuse();
let walredo_drain = slog::Filter::new(WALREDO_DRAIN.as_ref(),
|record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug) && record.module().starts_with("pageserver::walredo") {
let walredo_drain = slog::Filter::new(WALREDO_DRAIN.as_ref(), |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver::walredo")
{
return true;
}
return false;
})
.fuse();
let walreceiver_drain =
slog::Filter::new(WALRECEIVER_DRAIN.as_ref(), |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver::walreceiver")
{
return true;
}
return false;
}
).fuse();
})
.fuse();
let walreceiver_drain = slog::Filter::new(WALRECEIVER_DRAIN.as_ref(),
|record: &slog::Record| {
if record.level().is_at_least(slog::Level::Debug) && record.module().starts_with("pageserver::walreceiver") {
return true;
}
return false;
let catchall_drain = slog::Filter::new(CATCHALL_DRAIN.as_ref(), |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Info) {
return true;
}
).fuse();
let catchall_drain = slog::Filter::new(CATCHALL_DRAIN.as_ref(),
|record: &slog::Record| {
if record.level().is_at_least(slog::Level::Info) {
return true;
}
if record.level().is_at_least(slog::Level::Debug) && record.module().starts_with("pageserver") {
return true;
}
return false;
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver")
{
return true;
}
).fuse();
return false;
})
.fuse();
let drain = pageservice_drain;
let drain = slog::Duplicate::new(drain, walreceiver_drain).fuse();
let drain = slog::Duplicate::new(drain, walredo_drain).fuse();
let drain = slog::Duplicate::new(drain, catchall_drain).fuse();
let drain = slog_async::Async::new(drain).chan_size(1000).build().fuse();
let drain = slog::Filter::new(drain,
|record: &slog::Record| {
let drain = slog::Filter::new(drain, |record: &slog::Record| {
if record.level().is_at_least(slog::Level::Info) {
return true;
}
if record.level().is_at_least(slog::Level::Debug)
&& record.module().starts_with("pageserver")
{
return true;
}
if record.level().is_at_least(slog::Level::Info) {
return true;
}
if record.level().is_at_least(slog::Level::Debug) && record.module().starts_with("pageserver") {
return true;
}
return false;
}
).fuse();
return false;
})
.fuse();
let logger = slog::Logger::root(drain, slog::o!());
return slog_scope::set_global_logger(logger);
}
@@ -143,21 +148,27 @@ pub fn ui_main<'b>() -> Result<(), Box<dyn Error>> {
let top_top_right_chunk = c[0];
let top_bot_right_chunk = c[1];
f.render_widget(LogWidget::new(PAGESERVICE_DRAIN.as_ref(),"Page Service"),
top_top_left_chunk);
f.render_widget(
LogWidget::new(PAGESERVICE_DRAIN.as_ref(), "Page Service"),
top_top_left_chunk,
);
f.render_widget(LogWidget::new(WALREDO_DRAIN.as_ref(), "WAL Redo"),
top_bot_left_chunk);
f.render_widget(
LogWidget::new(WALREDO_DRAIN.as_ref(), "WAL Redo"),
top_bot_left_chunk,
);
f.render_widget(LogWidget::new(WALRECEIVER_DRAIN.as_ref(), "WAL Receiver"),
top_top_right_chunk);
f.render_widget(
LogWidget::new(WALRECEIVER_DRAIN.as_ref(), "WAL Receiver"),
top_top_right_chunk,
);
f.render_widget(MetricsWidget {}, top_bot_right_chunk);
f.render_widget(LogWidget::new(CATCHALL_DRAIN.as_ref(), "All Log")
.show_module(true),
bottom_chunk);
f.render_widget(
LogWidget::new(CATCHALL_DRAIN.as_ref(), "All Log").show_module(true),
bottom_chunk,
);
})?;
// If ther user presses 'q', quit.
@@ -177,7 +188,6 @@ pub fn ui_main<'b>() -> Result<(), Box<dyn Error>> {
Ok(())
}
struct LogWidget<'a> {
logger: &'a TuiLogger,
title: &'a str,
@@ -186,7 +196,11 @@ struct LogWidget<'a> {
impl<'a> LogWidget<'a> {
fn new(logger: &'a TuiLogger, title: &'a str) -> LogWidget<'a> {
LogWidget { logger, title, show_module: false }
LogWidget {
logger,
title,
show_module: false,
}
}
fn show_module(mut self, b: bool) -> LogWidget<'a> {
@@ -196,14 +210,14 @@ impl<'a> LogWidget<'a> {
}
impl<'a> Widget for LogWidget<'a> {
fn render(self, area: Rect, buf: &mut Buffer) {
let w = TuiLoggerWidget::default(self.logger)
.block(Block::default()
.borders(Borders::ALL)
.title(self.title)
.border_type(BorderType::Rounded))
.block(
Block::default()
.borders(Borders::ALL)
.title(self.title)
.border_type(BorderType::Rounded),
)
.show_module(true)
.style_error(Style::default().fg(Color::Red))
.style_warn(Style::default().fg(Color::Yellow))
@@ -213,14 +227,16 @@ impl<'a> Widget for LogWidget<'a> {
}
// Render a widget to show some metrics
struct MetricsWidget {
}
struct MetricsWidget {}
fn get_metric_u64<'a>(title: &'a str, value: u64) -> Spans<'a> {
Spans::from(vec![
Span::styled(format!("{:<20}", title), Style::default()),
Span::raw(": "),
Span::styled(value.to_string(), Style::default().add_modifier(Modifier::BOLD)),
Span::styled(
value.to_string(),
Style::default().add_modifier(Modifier::BOLD),
),
])
}
@@ -235,21 +251,16 @@ fn get_metric_str<'a>(title: &'a str, value: &'a str) -> Spans<'a> {
// FIXME: We really should define a datatype for LSNs, with Display trait and
// helper functions. There's one in tokio-postgres, but I don't think we want
// to rely on that.
fn format_lsn(lsn: u64) -> String
{
return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff)
fn format_lsn(lsn: u64) -> String {
return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff);
}
impl tui::widgets::Widget for MetricsWidget {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::default()
.borders(Borders::ALL)
.title("Page Cache Metrics")
.border_type(BorderType::Rounded);
.borders(Borders::ALL)
.title("Page Cache Metrics")
.border_type(BorderType::Rounded);
let inner_area = block.inner(area);
block.render(area, buf);
@@ -257,17 +268,30 @@ impl tui::widgets::Widget for MetricsWidget {
let mut lines: Vec<Spans> = Vec::new();
let page_cache_stats = crate::page_cache::get_stats();
let lsnrange = format!("{} - {}",
format_lsn(page_cache_stats.first_valid_lsn),
format_lsn(page_cache_stats.last_valid_lsn));
let last_valid_recordlsn_str =
format_lsn(page_cache_stats.last_record_lsn);
let lsnrange = format!(
"{} - {}",
format_lsn(page_cache_stats.first_valid_lsn),
format_lsn(page_cache_stats.last_valid_lsn)
);
let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn);
lines.push(get_metric_str("Valid LSN range", &lsnrange));
lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str));
lines.push(get_metric_u64("# of cache entries", page_cache_stats.num_entries));
lines.push(get_metric_u64("# of page images", page_cache_stats.num_page_images));
lines.push(get_metric_u64("# of WAL records", page_cache_stats.num_wal_records));
lines.push(get_metric_u64("# of GetPage@LSN calls", page_cache_stats.num_getpage_requests));
lines.push(get_metric_u64(
"# of cache entries",
page_cache_stats.num_entries,
));
lines.push(get_metric_u64(
"# of page images",
page_cache_stats.num_page_images,
));
lines.push(get_metric_u64(
"# of WAL records",
page_cache_stats.num_wal_records,
));
lines.push(get_metric_u64(
"# of GetPage@LSN calls",
page_cache_stats.num_getpage_requests,
));
let text = Text::from(lines);

View File

@@ -8,19 +8,19 @@
// Also, I didn't do any of the "hot log" stuff that gin66's implementation had, you can use an
// AsyncDrain to buffer and handle overflow if desired.
//
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::SystemTime;
use chrono::offset::Local;
use chrono::DateTime;
use slog;
use slog::{Drain, OwnedKVList, Record, Level};
use slog::{Drain, Level, OwnedKVList, Record};
use slog_async::AsyncRecord;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::SystemTime;
use tui::buffer::Buffer;
use tui::layout::{Rect};
use tui::style::{Style, Modifier};
use tui::layout::Rect;
use tui::style::{Modifier, Style};
use tui::text::{Span, Spans};
use tui::widgets::{Block, Widget, Paragraph, Wrap};
use tui::widgets::{Block, Paragraph, Widget, Wrap};
// Size of the log ring buffer, in # of records
static BUFFER_SIZE: usize = 1000;
@@ -41,11 +41,7 @@ impl Drain for TuiLogger {
type Ok = ();
type Err = slog::Error;
fn log(&self,
record: &Record,
values: &OwnedKVList)
-> Result<Self::Ok, Self::Err> {
fn log(&self, record: &Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
let mut events = self.events.lock().unwrap();
let now = SystemTime::now();
@@ -129,7 +125,6 @@ impl<'b> TuiLoggerWidget<'b> {
}
impl<'b> Widget for TuiLoggerWidget<'b> {
fn render(mut self, area: Rect, buf: &mut Buffer) {
buf.set_style(area, self.style);
let list_area = match self.block.take() {
Some(b) => {
@@ -156,7 +151,6 @@ impl<'b> Widget for TuiLoggerWidget<'b> {
let events = self.logger.events.lock().unwrap();
for evt in events.iter() {
let (timestamp, rec) = evt;
rec.as_record_values(|rec, _kwlist| {
@@ -200,7 +194,7 @@ impl<'b> Widget for TuiLoggerWidget<'b> {
let text = tui::text::Text::from(lines);
Paragraph::new(text)
.wrap(Wrap { trim: true } )
.wrap(Wrap { trim: true })
.render(list_area, buf);
}
}

View File

@@ -13,45 +13,41 @@ use log::*;
const XLOG_BLCKSZ: u32 = 8192;
// FIXME: this is configurable in PostgreSQL, 16 MB is the default
const WAL_SEGMENT_SIZE: u64 = 16*1024*1024;
const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024;
// From PostgreSQL headers
#[repr(C)]
#[derive(Debug)]
struct XLogPageHeaderData
{
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: u32, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
struct XLogPageHeaderData {
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: u32, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
}
// FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end
#[allow(non_upper_case_globals)]
const SizeOfXLogShortPHD: usize = 2+2+4+8+4 + 4;
const SizeOfXLogShortPHD: usize = 2 + 2 + 4 + 8 + 4 + 4;
#[repr(C)]
#[derive(Debug)]
struct XLogLongPageHeaderData
{
std: XLogPageHeaderData, /* standard header fields */
xlp_sysid: u64, /* system identifier from pg_control */
xlp_seg_size: u32, /* just as a cross-check */
xlp_xlog_blcksz: u32, /* just as a cross-check */
struct XLogLongPageHeaderData {
std: XLogPageHeaderData, /* standard header fields */
xlp_sysid: u64, /* system identifier from pg_control */
xlp_seg_size: u32, /* just as a cross-check */
xlp_xlog_blcksz: u32, /* just as a cross-check */
}
// FIXME: this assumes MAXIMUM_ALIGNOF 8.
#[allow(non_upper_case_globals)]
const SizeOfXLogLongPHD: usize = (2+2+4+8+4) + 4 + 8 + 4 + 4;
const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4;
pub struct WalStreamDecoder {
lsn: u64,
startlsn: u64, // LSN where this record starts
startlsn: u64, // LSN where this record starts
contlen: u32,
padlen: u32,
@@ -65,7 +61,6 @@ pub struct WalStreamDecoder {
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: u64) -> WalStreamDecoder {
WalStreamDecoder {
lsn: lsn,
@@ -86,7 +81,6 @@ impl WalStreamDecoder {
// Returns a tuple:
// (end LSN, record)
pub fn poll_decode(&mut self) -> Option<(u64, Bytes)> {
loop {
// parse and verify page boundaries as we go
if self.lsn % WAL_SEGMENT_SIZE == 0 {
@@ -115,9 +109,7 @@ impl WalStreamDecoder {
// TODO: verify the fields in the header
continue;
}
else if self.padlen > 0
{
} else if self.padlen > 0 {
if self.inputbuf.remaining() < self.padlen as usize {
return None;
}
@@ -126,9 +118,7 @@ impl WalStreamDecoder {
self.inputbuf.advance(self.padlen as usize);
self.lsn += self.padlen as u64;
self.padlen = 0;
}
else if self.contlen == 0
{
} else if self.contlen == 0 {
// need to have at least the xl_tot_len field
if self.inputbuf.remaining() < 4 {
@@ -139,8 +129,12 @@ impl WalStreamDecoder {
self.startlsn = self.lsn;
let xl_tot_len = self.inputbuf.get_u32_le();
if xl_tot_len < SizeOfXLogRecord {
error!("invalid xl_tot_len {} at {:X}/{:X}", xl_tot_len,
self.lsn >> 32, self.lsn & 0xffffffff);
error!(
"invalid xl_tot_len {} at {:X}/{:X}",
xl_tot_len,
self.lsn >> 32,
self.lsn & 0xffffffff
);
panic!();
}
self.lsn += 4;
@@ -151,11 +145,9 @@ impl WalStreamDecoder {
self.contlen = xl_tot_len - 4;
continue;
}
else
{
} else {
// we're continuing a record, possibly from previous page.
let pageleft:u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32;
let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32;
// read the rest of the record, or as much as fits on this page.
let n = min(self.contlen, pageleft) as usize;
@@ -176,8 +168,11 @@ impl WalStreamDecoder {
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
if is_xlog_switch_record(&recordbuf) {
trace!("saw xlog switch record at {:X}/{:X}",
(self.lsn >> 32), self.lsn & 0xffffffff);
trace!(
"saw xlog switch record at {:X}/{:X}",
(self.lsn >> 32),
self.lsn & 0xffffffff
);
self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32;
}
@@ -195,24 +190,21 @@ impl WalStreamDecoder {
// deal with continuation records
// deal with xlog_switch records
}
#[allow(non_snake_case)]
fn decode_XLogPageHeaderData(&mut self) -> XLogPageHeaderData {
let buf = &mut self.inputbuf;
// FIXME: Assume little-endian
let hdr : XLogPageHeaderData = XLogPageHeaderData {
let hdr: XLogPageHeaderData = XLogPageHeaderData {
xlp_magic: buf.get_u16_le(),
xlp_info: buf.get_u16_le(),
xlp_tli: buf.get_u32_le(),
xlp_pageaddr: buf.get_u64_le(),
xlp_rem_len: buf.get_u32_le()
xlp_rem_len: buf.get_u32_le(),
};
// 4 bytes of padding, on 64-bit systems
buf.advance(4);
@@ -225,8 +217,7 @@ impl WalStreamDecoder {
#[allow(non_snake_case)]
fn decode_XLogLongPageHeaderData(&mut self) -> XLogLongPageHeaderData {
let hdr : XLogLongPageHeaderData = XLogLongPageHeaderData {
let hdr: XLogLongPageHeaderData = XLogLongPageHeaderData {
std: self.decode_XLogPageHeaderData(),
xlp_sysid: self.inputbuf.get_u64_le(),
xlp_seg_size: self.inputbuf.get_u32_le(),
@@ -238,30 +229,29 @@ impl WalStreamDecoder {
}
// FIXME:
const BLCKSZ:u16 = 8192;
const BLCKSZ: u16 = 8192;
//
// Constants from xlogrecord.h
//
const XLR_MAX_BLOCK_ID:u8 = 32;
const XLR_MAX_BLOCK_ID: u8 = 32;
const XLR_BLOCK_ID_DATA_SHORT:u8 = 255;
const XLR_BLOCK_ID_DATA_LONG:u8 = 254;
const XLR_BLOCK_ID_ORIGIN:u8 = 253;
const XLR_BLOCK_ID_TOPLEVEL_XID:u8 = 252;
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
const XLR_BLOCK_ID_DATA_LONG: u8 = 254;
const XLR_BLOCK_ID_ORIGIN: u8 = 253;
const XLR_BLOCK_ID_TOPLEVEL_XID: u8 = 252;
const BKPBLOCK_FORK_MASK:u8 = 0x0F;
const _BKPBLOCK_FLAG_MASK:u8 = 0xF0;
const BKPBLOCK_HAS_IMAGE:u8 = 0x10; /* block data is an XLogRecordBlockImage */
const BKPBLOCK_HAS_DATA:u8 = 0x20;
const BKPBLOCK_WILL_INIT:u8 = 0x40; /* redo will re-init the page */
const BKPBLOCK_SAME_REL:u8 = 0x80; /* RelFileNode omitted, same as previous */
const BKPBLOCK_FORK_MASK: u8 = 0x0F;
const _BKPBLOCK_FLAG_MASK: u8 = 0xF0;
const BKPBLOCK_HAS_IMAGE: u8 = 0x10; /* block data is an XLogRecordBlockImage */
const BKPBLOCK_HAS_DATA: u8 = 0x20;
const BKPBLOCK_WILL_INIT: u8 = 0x40; /* redo will re-init the page */
const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous */
/* Information stored in bimg_info */
const BKPIMAGE_HAS_HOLE:u8 = 0x01; /* page image has "hole" */
const BKPIMAGE_IS_COMPRESSED:u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY:u8 = 0x04; /* page image should be restored during replay */
const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
@@ -278,8 +268,8 @@ pub struct DecodedBkpBlock {
flags: u8,
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool,
//char *bkp_image;
hole_offset: u16,
@@ -290,22 +280,22 @@ pub struct DecodedBkpBlock {
/* Buffer holding the rmgr-specific data associated with this block */
has_data: bool,
//char *data;
data_len:u16,
data_len: u16,
}
#[allow(non_upper_case_globals)]
const SizeOfXLogRecord:u32 = 24;
const SizeOfXLogRecord: u32 = 24;
pub struct DecodedWALRecord {
pub lsn: u64, // LSN at the *end* of the record
pub record: Bytes, // raw XLogRecord
pub lsn: u64, // LSN at the *end* of the record
pub record: Bytes, // raw XLogRecord
pub blocks: Vec<DecodedBkpBlock>
pub blocks: Vec<DecodedBkpBlock>,
}
// From pg_control.h and rmgrlist.h
const XLOG_SWITCH:u8 = 0x40;
const RM_XLOG_ID:u8 = 0;
const XLOG_SWITCH: u8 = 0x40;
const RM_XLOG_ID: u8 = 0;
// Is this record an XLOG_SWITCH record? They need some special processing,
// so we need to check for that before the rest of the parsing.
@@ -320,7 +310,7 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID;
@@ -330,8 +320,12 @@ fn is_xlog_switch_record(rec: &Bytes) -> bool {
// Routines to decode a WAL record and figure out which blocks are modified
//
pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
trace!("decoding record with LSN {:08X}/{:08X} ({} bytes)", lsn >> 32, lsn & 0xffff_ffff, rec.remaining());
trace!(
"decoding record with LSN {:08X}/{:08X} ({} bytes)",
lsn >> 32,
lsn & 0xffff_ffff,
rec.remaining()
);
let mut buf = rec.clone();
@@ -341,7 +335,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
let _xl_prev = buf.get_u64_le();
let _xl_info = buf.get_u8();
let _xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
let remaining = xl_tot_len - SizeOfXLogRecord;
@@ -365,31 +359,31 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
match block_id {
XLR_BLOCK_ID_DATA_SHORT => {
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u8() as u32;
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u8() as u32;
datatotal += main_data_len;
}
XLR_BLOCK_ID_DATA_LONG => {
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u32();
/* XLogRecordDataHeaderShort */
let main_data_len = buf.get_u32();
datatotal += main_data_len;
}
XLR_BLOCK_ID_ORIGIN => {
XLR_BLOCK_ID_ORIGIN => {
// RepOriginId is uint16
buf.advance(2);
}
XLR_BLOCK_ID_TOPLEVEL_XID => {
XLR_BLOCK_ID_TOPLEVEL_XID => {
// TransactionId is uint32
buf.advance(4);
}
0 ..= XLR_MAX_BLOCK_ID => {
/* XLogRecordBlockHeader */
0..=XLR_MAX_BLOCK_ID => {
/* XLogRecordBlockHeader */
let mut blk = DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
@@ -407,168 +401,157 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
bimg_info: 0,
has_data: false,
data_len: 0
data_len: 0,
};
let fork_flags: u8;
if block_id <= max_block_id {
if block_id <= max_block_id {
// TODO
//report_invalid_record(state,
// "out-of-order block_id %u at %X/%X",
// block_id,
// (uint32) (state->ReadRecPtr >> 32),
// (uint32) state->ReadRecPtr);
// goto err;
}
max_block_id = block_id;
//report_invalid_record(state,
// "out-of-order block_id %u at %X/%X",
// block_id,
// (uint32) (state->ReadRecPtr >> 32),
// (uint32) state->ReadRecPtr);
// goto err;
}
max_block_id = block_id;
fork_flags = buf.get_u8();
blk.forknum = fork_flags & BKPBLOCK_FORK_MASK;
blk.flags = fork_flags;
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
blk.has_image = (fork_flags & BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.will_init = (fork_flags & BKPBLOCK_WILL_INIT) != 0;
blk.data_len = buf.get_u16_le();
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
// TODO
/*
if (blk->has_data && blk->data_len == 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
if (!blk->has_data && blk->data_len != 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
*/
datatotal += blk.data_len as u32;
if blk.has_image {
if (blk->has_data && blk->data_len == 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
if (!blk->has_data && blk->data_len != 0)
{
report_invalid_record(state,
"BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
*/
datatotal += blk.data_len as u32;
if blk.has_image {
blk.bimg_len = buf.get_u16_le();
blk.hole_offset = buf.get_u16_le();
blk.bimg_info = buf.get_u8();
blk.apply_image = (blk.bimg_info & BKPIMAGE_APPLY) != 0;
blk.apply_image = (blk.bimg_info & BKPIMAGE_APPLY) != 0;
if blk.bimg_info & BKPIMAGE_IS_COMPRESSED != 0
{
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le();
} else {
blk.hole_length = 0;
if blk.bimg_info & BKPIMAGE_IS_COMPRESSED != 0 {
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le();
} else {
blk.hole_length = 0;
}
}
else {
blk.hole_length = BLCKSZ - blk.bimg_len;
} else {
blk.hole_length = BLCKSZ - blk.bimg_len;
}
datatotal += blk.bimg_len as u32;
datatotal += blk.bimg_len as u32;
/*
* cross-check that hole_offset > 0, hole_length > 0 and
* bimg_len < BLCKSZ if the HAS_HOLE flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0 &&
(blk.hole_offset == 0 ||
blk.hole_length == 0 ||
blk.bimg_len == BLCKSZ)
{
/*
* cross-check that hole_offset > 0, hole_length > 0 and
* bimg_len < BLCKSZ if the HAS_HOLE flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE != 0
&& (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == BLCKSZ)
{
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
/*
* cross-check that hole_offset == 0 and hole_length == 0 if
* the HAS_HOLE flag is not set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0 &&
(blk.hole_offset != 0 || blk.hole_length != 0)
{
/*
* cross-check that hole_offset == 0 and hole_length == 0 if
* the HAS_HOLE flag is not set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0
&& (blk.hole_offset != 0 || blk.hole_length != 0)
{
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
/*
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set.
*/
if (blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0) &&
blk.bimg_len == BLCKSZ
{
/*
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set.
*/
if (blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == BLCKSZ {
// TODO
/*
report_invalid_record(state,
"BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
report_invalid_record(state,
"BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
/*
* cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
* IS_COMPRESSED flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0 &&
blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0 &&
blk.bimg_len != BLCKSZ
{
/*
* cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
* IS_COMPRESSED flag is set.
*/
if blk.bimg_info & BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & BKPIMAGE_IS_COMPRESSED == 0
&& blk.bimg_len != BLCKSZ
{
// TODO
/*
report_invalid_record(state,
"neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
report_invalid_record(state,
"neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
}
if fork_flags & BKPBLOCK_SAME_REL == 0
{
rnode_spcnode = buf.get_u32_le();
if fork_flags & BKPBLOCK_SAME_REL == 0 {
rnode_spcnode = buf.get_u32_le();
rnode_dbnode = buf.get_u32_le();
rnode_relnode = buf.get_u32_le();
//rnode = &blk->rnode;
//rnode = &blk->rnode;
got_rnode = true;
}
else
{
if !got_rnode
{
} else {
if !got_rnode {
// TODO
/*
report_invalid_record(state,
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
report_invalid_record(state,
"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
*/
}
//blk->rnode = *rnode;
}
//blk->rnode = *rnode;
}
blk.rnode_spcnode = rnode_spcnode;
blk.rnode_dbnode = rnode_dbnode;
blk.rnode_relnode = rnode_relnode;
@@ -601,6 +584,6 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
return DecodedWALRecord {
lsn: lsn,
record: rec,
blocks: blocks
}
blocks: blocks,
};
}

View File

@@ -7,23 +7,22 @@
//
use log::*;
use tokio_stream::StreamExt;
use tokio::runtime;
use tokio::time::{sleep, Duration};
use tokio_stream::StreamExt;
use crate::waldecoder::WalStreamDecoder;
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::waldecoder::WalStreamDecoder;
use crate::PageServerConf;
use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode};
use postgres_protocol::message::backend::ReplicationMessage;
use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode};
//
// This is the entry point for the WAL receiver thread.
//
pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
info!("WAL receiver thread started: '{}'", wal_producer_connstr);
let runtime = runtime::Builder::new_current_thread()
@@ -31,26 +30,32 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
.build()
.unwrap();
runtime.block_on( async {
runtime.block_on(async {
loop {
let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await;
// TODO: print/log the error
info!("WAL streaming connection failed, retrying in 1 second...: {:?}", _res);
info!(
"WAL streaming connection failed, retrying in 1 second...: {:?}",
_res
);
sleep(Duration::from_secs(1)).await;
}
});
}
async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -> Result<(), Error> {
async fn walreceiver_main(
conf: PageServerConf,
wal_producer_connstr: &String,
) -> Result<(), Error> {
// Connect to the database in replication mode.
debug!("connecting to {}...", wal_producer_connstr);
let (mut rclient, connection) = connect_replication(
wal_producer_connstr.as_str(),
NoTls,
ReplicationMode::Physical
).await?;
ReplicationMode::Physical,
)
.await?;
debug!("connected!");
// The connection object performs the actual communication with the database,
@@ -65,7 +70,7 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
let end_of_wal = u64::from(identify_system.xlogpos());
let mut caught_up = false;
let sysid : u64 = identify_system.systemid().parse().unwrap();
let sysid: u64 = identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecahe(conf, sysid);
//
@@ -93,9 +98,13 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
startpoint += 8 - (startpoint % 8);
}
}
debug!("starting replication from {:X}/{:X}, server is at {:X}/{:X}...",
(startpoint >> 32), (startpoint & 0xffffffff),
(end_of_wal >> 32), (end_of_wal & 0xffffffff));
debug!(
"starting replication from {:X}/{:X}, server is at {:X}/{:X}...",
(startpoint >> 32),
(startpoint & 0xffffffff),
(end_of_wal >> 32),
(end_of_wal & 0xffffffff)
);
let startpoint = tokio_postgres::types::Lsn::from(startpoint);
let mut physical_stream = rclient
.start_physical_replication(None, startpoint, None)
@@ -105,23 +114,26 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
while let Some(replication_message) = physical_stream.next().await {
match replication_message? {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = xlog_data.wal_start();
let endlsn = startlsn + data.len() as u64;
trace!("received XLogData between {:X}/{:X} and {:X}/{:X}",
(startlsn >> 32), (startlsn & 0xffffffff),
(endlsn >> 32), (endlsn & 0xffffffff));
trace!(
"received XLogData between {:X}/{:X} and {:X}/{:X}",
(startlsn >> 32),
(startlsn & 0xffffffff),
(endlsn >> 32),
(endlsn & 0xffffffff)
);
waldecoder.feed_bytes(data);
loop {
if let Some((lsn, recdata)) = waldecoder.poll_decode() {
let decoded = crate::waldecoder::decode_wal_record(startlsn, recdata.clone());
let decoded =
crate::waldecoder::decode_wal_record(startlsn, recdata.clone());
// Put the WAL record to the page cache. We make a separate copy of
// it for every block it modifies. (The actual WAL record is kept in
@@ -133,13 +145,13 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
blknum: blk.blkno
blknum: blk.blkno,
};
let rec = page_cache::WALRecord {
lsn: lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone()
rec: recdata.clone(),
};
pcache.put_wal_record(tag, rec);
@@ -148,7 +160,6 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
pcache.advance_last_valid_lsn(lsn);
} else {
break;
}
@@ -163,7 +174,11 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
pcache.advance_last_valid_lsn(endlsn);
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {:X}/{:X}", (endlsn >> 32), (endlsn & 0xffffffff));
info!(
"caught up at LSN {:X}/{:X}",
(endlsn >> 32),
(endlsn & 0xffffffff)
);
caught_up = true;
}
}

View File

@@ -14,48 +14,49 @@
// TODO: Even though the postgres code runs in a separate process,
// it's not a secure sandbox.
//
use tokio::runtime::Runtime;
use tokio::process::{Command, Child, ChildStdin, ChildStdout};
use std::{path::PathBuf, process::Stdio};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt;
use tokio::time::timeout;
use std::io::Error;
use std::cell::RefCell;
use std::assert;
use std::sync::{Arc};
use std::fs;
use log::*;
use std::time::Instant;
use std::assert;
use std::cell::RefCell;
use std::fs;
use std::io::Error;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::{path::PathBuf, process::Stdio};
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::runtime::Runtime;
use tokio::time::timeout;
use bytes::{Bytes, BytesMut, BufMut};
use bytes::{BufMut, Bytes, BytesMut};
use crate::{PageServerConf, page_cache::BufferTag};
use crate::page_cache;
use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord;
use crate::page_cache;
use crate::{page_cache::BufferTag, PageServerConf};
static TIMEOUT: Duration = Duration::from_secs(20);
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_redo_main(conf: PageServerConf, sys_id: u64)
{
pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) {
info!("WAL redo thread started {}", sys_id);
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &pcache.walredo_receiver;
loop {
let mut process: WalRedoProcess;
let datadir = conf.data_dir.join(format!("wal-redo/{}", sys_id));
@@ -87,8 +88,12 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64)
}
}
fn handle_apply_request(pcache: &page_cache::PageCache, process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error>
{
fn handle_apply_request(
pcache: &page_cache::PageCache,
process: &WalRedoProcess,
runtime: &Runtime,
entry_rc: Arc<CacheEntry>,
) -> Result<(), Error> {
let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn;
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
@@ -104,16 +109,22 @@ fn handle_apply_request(pcache: &page_cache::PageCache, process: &WalRedoProcess
let result;
debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords, duration.as_millis(),
lsn >> 32, lsn & 0xffff_ffff);
debug!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
);
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
pcache.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pcache
.num_page_images
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
result = Ok(());
}
@@ -130,7 +141,6 @@ struct WalRedoProcess {
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
@@ -138,22 +148,23 @@ impl WalRedoProcess {
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
fn launch(datadir: &PathBuf, runtime: &Runtime) -> Result<WalRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir.to_str().unwrap()).ok();
let initdb = runtime.block_on(Command::new("initdb")
.args(&["-D", datadir.to_str().unwrap()])
.arg("-N")
.status()
).expect("failed to execute initdb");
let initdb = runtime
.block_on(
Command::new("initdb")
.args(&["-D", datadir.to_str().unwrap()])
.arg("-N")
.status(),
)
.expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
// Start postgres itself
let mut child =
Command::new("postgres")
let mut child = Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
@@ -162,7 +173,10 @@ impl WalRedoProcess {
.spawn()
.expect("postgres --wal-redo command failed to start");
info!("launched WAL redo postgres process on {}", datadir.to_str().unwrap());
info!(
"launched WAL redo postgres process on {}",
datadir.to_str().unwrap()
);
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
@@ -200,12 +214,16 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
fn apply_wal_records(&self, runtime: &Runtime, tag: BufferTag, base_img: Option<Bytes>, records: Vec<WALRecord>) -> Result<Bytes, Error>
{
fn apply_wal_records(
&self,
runtime: &Runtime,
tag: BufferTag,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, Error> {
let mut stdin = self.stdin.borrow_mut();
let mut stdout = self.stdout.borrow_mut();
return runtime.block_on(async {
//
// This async block sends all the commands to the process.
//
@@ -216,16 +234,26 @@ impl WalRedoProcess {
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(TIMEOUT, stdin.write_all(&build_begin_redo_for_block_msg(tag))).await??;
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if base_img.is_some() {
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, base_img.unwrap()))).await??;
timeout(
TIMEOUT,
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
stdin.write_all(&build_apply_record_msg(r.lsn, r.rec)).await?;
stdin
.write_all(&build_apply_record_msg(r.lsn, r.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
@@ -246,7 +274,7 @@ impl WalRedoProcess {
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8;8192], Error>(buf)
Ok::<[u8; 8192], Error>(buf)
};
// Kill the process. This closes its stdin, which should signal the process
@@ -262,9 +290,8 @@ impl WalRedoProcess {
}
}
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes
{
let len = 4 + 5*4;
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('B' as u8);
@@ -280,11 +307,10 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes
return buf.freeze();
}
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
{
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
assert!(base_img.len() == 8192);
let len = 4 + 5*4 + base_img.len();
let len = 4 + 5 * 4 + base_img.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('P' as u8);
@@ -302,7 +328,6 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes
}
fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
let len = 4 + 8 + rec.len();
let mut buf = BytesMut::with_capacity(1 + len);
@@ -317,7 +342,7 @@ fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes {
}
fn build_get_page_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5*4;
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8('G' as u8);