Support several postgres instances on top of a single pageserver.

Each postgres will use its own page cache with associated data
structures. Postgres system_id is used to distinguish instances.
That also means that backup should have valid system_id stashed
somewhere. For now I put '42' as sys_id during S3 restore, but
that ought to be fixed.

Also this commit introduces new way of starting WAL receivers:
postgres can initiate such connection by calling 'callmemaybe $url'
command in the page_service -- that will start appropriate wal-redo
and wal-receiver threads. This way page server may start without
a priori knowledge of compute node addreses.
This commit is contained in:
Stas Kelvich
2021-04-02 21:08:34 +03:00
parent 6eabe17e98
commit 2c308da4d2
12 changed files with 393 additions and 243 deletions

View File

@@ -27,4 +27,4 @@ jobs:
- name: Run test - name: Run test
run: | run: |
cargo test --test test_pageserver -- --nocapture --test-threads=1 cargo test --test test_pageserver -- --nocapture --test-threads=1

View File

@@ -7,6 +7,7 @@ use std::{fs::File, str::FromStr};
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread; use std::thread;
use std::fs;
use clap::{App, Arg}; use clap::{App, Arg};
use daemonize::Daemonize; use daemonize::Daemonize;
@@ -20,7 +21,6 @@ use pageserver::page_service;
use pageserver::restore_s3; use pageserver::restore_s3;
use pageserver::tui; use pageserver::tui;
use pageserver::walreceiver; use pageserver::walreceiver;
use pageserver::walredo;
use pageserver::PageServerConf; use pageserver::PageServerConf;
fn main() -> Result<(), io::Error> { fn main() -> Result<(), io::Error> {
@@ -61,7 +61,7 @@ fn main() -> Result<(), io::Error> {
data_dir: PathBuf::from("./"), data_dir: PathBuf::from("./"),
daemonize: false, daemonize: false,
interactive: false, interactive: false,
wal_producer_connstr: String::from_str("host=127.0.0.1 port=65432 user=zenith").unwrap(), wal_producer_connstr: None,
listen_addr: "127.0.0.1:5430".parse().unwrap(), listen_addr: "127.0.0.1:5430".parse().unwrap(),
skip_recovery: false, skip_recovery: false,
}; };
@@ -90,7 +90,7 @@ fn main() -> Result<(), io::Error> {
} }
if let Some(addr) = arg_matches.value_of("wal_producer") { if let Some(addr) = arg_matches.value_of("wal_producer") {
conf.wal_producer_connstr = String::from_str(addr).unwrap(); conf.wal_producer_connstr = Some(String::from_str(addr).unwrap());
} }
if let Some(addr) = arg_matches.value_of("listen") { if let Some(addr) = arg_matches.value_of("listen") {
@@ -148,43 +148,51 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
info!("starting..."); info!("starting...");
// Initialize the WAL applicator
let walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(|| {
walredo::wal_applicator_main();
})
.unwrap();
threads.push(walredo_thread);
// Before opening up for connections, restore the latest base backup from S3. // Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do // (We don't persist anything to local disk at the moment, so we need to do
// this at every startup) // this at every startup)
if !conf.skip_recovery { if !conf.skip_recovery {
restore_s3::restore_main(); restore_s3::restore_main(&conf);
} }
// Launch the WAL receiver thread. It will try to connect to the WAL safekeeper, // Create directory for wal-redo datadirs
// and stream the WAL. If the connection is lost, it will reconnect on its own. match fs::create_dir(conf.data_dir.join("wal-redo")) {
// We just fire and forget it here. Ok(_) => {},
let conf1 = conf.clone(); Err(e) => match e.kind() {
let walreceiver_thread = thread::Builder::new() io::ErrorKind::AlreadyExists => {}
.name("WAL receiver thread".into()) _ => {
.spawn(|| { panic!("Failed to create wal-redo data directory: {}", e);
// thread code }
walreceiver::thread_main(conf1); }
}) }
.unwrap();
threads.push(walreceiver_thread); // Launch the WAL receiver thread if pageserver was started with --wal-producer
// option. It will try to connect to the WAL safekeeper, and stream the WAL. If
// the connection is lost, it will reconnect on its own. We just fire and forget
// it here.
//
// All other wal receivers are started on demand by "callmemaybe" command
// sent to pageserver.
let conf_copy = conf.clone();
if let Some(wal_producer) = conf.wal_producer_connstr {
let conf = conf_copy.clone();
let walreceiver_thread = thread::Builder::new()
.name("static WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf, &wal_producer);
})
.unwrap();
threads.push(walreceiver_thread);
}
// GetPage@LSN requests are served by another thread. (It uses async I/O, // GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that) // but the code in page_service sets up it own thread pool for that)
let conf2 = conf.clone(); let conf = conf_copy.clone();
let page_server_thread = thread::Builder::new() let page_server_thread = thread::Builder::new()
.name("Page Service thread".into()) .name("Page Service thread".into())
.spawn(|| { .spawn(|| {
// thread code // thread code
page_service::thread_main(conf2); page_service::thread_main(conf);
}) })
.unwrap(); .unwrap();
threads.push(page_server_thread); threads.push(page_server_thread);

View File

@@ -7,7 +7,7 @@
// local installations. // local installations.
// //
use std::fs::{self, OpenOptions}; use std::{fs::{self, OpenOptions}, rc::Rc};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::str; use std::str;
@@ -45,7 +45,7 @@ pub struct StorageControlPlane {
impl StorageControlPlane { impl StorageControlPlane {
// postgres <-> page_server // postgres <-> page_server
pub fn one_page_server(pg_connstr: String) -> StorageControlPlane { pub fn one_page_server() -> StorageControlPlane {
let mut cplane = StorageControlPlane { let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(), wal_acceptors: Vec::new(),
page_servers: Vec::new(), page_servers: Vec::new(),
@@ -53,7 +53,6 @@ impl StorageControlPlane {
let pserver = PageServerNode { let pserver = PageServerNode {
page_service_addr: "127.0.0.1:65200".parse().unwrap(), page_service_addr: "127.0.0.1:65200".parse().unwrap(),
wal_producer_connstr: pg_connstr,
data_dir: TEST_WORKDIR.join("pageserver") data_dir: TEST_WORKDIR.join("pageserver")
}; };
pserver.init(); pserver.init();
@@ -73,26 +72,25 @@ impl StorageControlPlane {
fn get_wal_acceptor_conn_info() {} fn get_wal_acceptor_conn_info() {}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let addr = &self.page_servers[0].page_service_addr;
pub fn simple_query_storage(&self, db: &str, user: &str, sql: &str) -> Vec<tokio_postgres::SimpleQueryMessage> {
let connstring = format!( let connstring = format!(
"host={} port={} dbname={} user={}", "host={} port={} dbname={} user={}",
self.page_server_addr().ip(), addr.ip(),
self.page_server_addr().port(), addr.port(),
db, "no_db",
user "no_user",
); );
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql); println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap() client.simple_query(sql).unwrap()
} }
} }
pub struct PageServerNode { pub struct PageServerNode {
page_service_addr: SocketAddr, page_service_addr: SocketAddr,
wal_producer_connstr: String,
data_dir: PathBuf, data_dir: PathBuf,
} }
@@ -119,17 +117,15 @@ impl PageServerNode {
} }
pub fn start(&self) { pub fn start(&self) {
println!("Starting pageserver at '{}', wal_producer='{}'", self.page_service_addr, self.wal_producer_connstr); println!("Starting pageserver at '{}'", self.page_service_addr);
let status = Command::new(CARGO_BIN_DIR.join("pageserver")) let status = Command::new(CARGO_BIN_DIR.join("pageserver"))
.args(&["-D", self.data_dir.to_str().unwrap()]) .args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-w", self.wal_producer_connstr.as_str()])
.args(&["-l", self.page_service_addr.to_string().as_str()]) .args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d") .arg("-d")
.arg("--skip-recovery") .arg("--skip-recovery")
.env_clear() .env_clear()
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
.env("PGDATA", self.data_dir.join("wal_redo_pgdata")) // postres-wal-redo pgdata
.status() .status()
.expect("failed to start pageserver"); .expect("failed to start pageserver");
@@ -172,19 +168,22 @@ impl WalAcceptorNode {}
// //
// ComputeControlPlane // ComputeControlPlane
// //
pub struct ComputeControlPlane { pub struct ComputeControlPlane<'a> {
pg_bin_dir: PathBuf, pg_bin_dir: PathBuf,
work_dir: PathBuf, work_dir: PathBuf,
last_assigned_port: u16, last_assigned_port: u16,
nodes: Vec<PostgresNode>, storage_cplane: &'a StorageControlPlane,
nodes: Vec<Rc<PostgresNode>>,
} }
impl ComputeControlPlane { impl ComputeControlPlane<'_> {
pub fn local() -> ComputeControlPlane {
pub fn local(storage_cplane : &StorageControlPlane) -> ComputeControlPlane {
ComputeControlPlane { ComputeControlPlane {
pg_bin_dir: PG_BIN_DIR.to_path_buf(), pg_bin_dir: PG_BIN_DIR.to_path_buf(),
work_dir: TEST_WORKDIR.to_path_buf(), work_dir: TEST_WORKDIR.to_path_buf(),
last_assigned_port: 65431, last_assigned_port: 65431,
storage_cplane: storage_cplane,
nodes: Vec::new(), nodes: Vec::new(),
} }
} }
@@ -196,7 +195,7 @@ impl ComputeControlPlane {
port port
} }
pub fn new_vanilla_node(&mut self) -> &PostgresNode { pub fn new_vanilla_node<'a>(&mut self) -> &Rc<PostgresNode> {
// allocate new node entry with generated port // allocate new node entry with generated port
let node_id = self.nodes.len() + 1; let node_id = self.nodes.len() + 1;
let node = PostgresNode { let node = PostgresNode {
@@ -206,7 +205,7 @@ impl ComputeControlPlane {
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(), pg_bin_dir: self.pg_bin_dir.clone(),
}; };
self.nodes.push(node); self.nodes.push(Rc::new(node));
let node = self.nodes.last().unwrap(); let node = self.nodes.last().unwrap();
// initialize data directory // initialize data directory
@@ -260,7 +259,7 @@ impl ComputeControlPlane {
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(), pg_bin_dir: self.pg_bin_dir.clone(),
}; };
self.nodes.push(node); self.nodes.push(Rc::new(node));
let node = self.nodes.last().unwrap(); let node = self.nodes.last().unwrap();
// initialize data directory w/o files // initialize data directory w/o files
@@ -297,6 +296,20 @@ impl ComputeControlPlane {
node node
} }
pub fn new_node(&mut self) -> Rc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let node = self.new_vanilla_node();
let pserver = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pserver.ip(), pserver.port()).as_str());
node.clone()
}
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@@ -339,8 +352,11 @@ impl PostgresNode {
} }
} }
pub fn start(&self) { pub fn start(&self, storage_cplane: &StorageControlPlane) {
println!("Started postgres node at '{}'", self.connstr()); let _res = storage_cplane
.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl("start", true); self.pg_ctl("start", true);
} }
@@ -349,11 +365,11 @@ impl PostgresNode {
} }
pub fn stop(&self) { pub fn stop(&self) {
self.pg_ctl("stop", true); self.pg_ctl("-m immediate stop", true);
} }
pub fn connstr(&self) -> String { pub fn connstr(&self) -> String {
format!("user={} host={} port={}", self.whoami(), self.ip, self.port) format!("host={} port={} user={}", self.ip, self.port, self.whoami())
} }
// XXX: cache that in control plane // XXX: cache that in control plane
@@ -370,7 +386,6 @@ impl PostgresNode {
} }
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> { pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
// XXX: user!
let connstring = format!( let connstring = format!(
"host={} port={} dbname={} user={}", "host={} port={} dbname={} user={}",
self.ip, self.ip,

View File

@@ -19,18 +19,18 @@ pub mod xlog_utils;
mod tui_logger; mod tui_logger;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Clone)] #[derive(Debug, Clone)]
pub struct PageServerConf { pub struct PageServerConf {
pub data_dir: PathBuf, pub data_dir: PathBuf,
pub daemonize: bool, pub daemonize: bool,
pub interactive: bool, pub interactive: bool,
pub wal_producer_connstr: String, pub wal_producer_connstr: Option<String>,
pub listen_addr: SocketAddr, pub listen_addr: SocketAddr,
pub skip_recovery: bool, pub skip_recovery: bool,
} }
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug,Clone)] #[derive(Debug, Clone)]
pub struct WalAcceptorConf { pub struct WalAcceptorConf {
pub data_dir: PathBuf, pub data_dir: PathBuf,
pub no_sync: bool, pub no_sync: bool,

View File

@@ -7,25 +7,27 @@
// //
use core::ops::Bound::Included; use core::ops::Bound::Included;
use std::convert::TryInto; use std::{convert::TryInto, ops::AddAssign};
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::{Arc,Condvar, Mutex};
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::Duration;
use std::thread;
// use tokio::sync::RwLock;
use bytes::Bytes; use bytes::Bytes;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rand::Rng; use rand::Rng;
use log::*; use log::*;
use crate::{PageServerConf, walredo};
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use crossbeam_channel::{Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(20); static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache { pub struct PageCache {
shared: Mutex<PageCacheShared>, shared: Mutex<PageCacheShared>,
@@ -49,6 +51,7 @@ pub struct PageCache {
pub last_record_lsn: AtomicU64, pub last_record_lsn: AtomicU64,
} }
#[derive(Clone)]
pub struct PageCacheStats { pub struct PageCacheStats {
pub num_entries: u64, pub num_entries: u64,
pub num_page_images: u64, pub num_page_images: u64,
@@ -59,6 +62,21 @@ pub struct PageCacheStats {
pub last_record_lsn: u64, pub last_record_lsn: u64,
} }
impl AddAssign for PageCacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
num_page_images: self.num_page_images + other.num_page_images,
num_wal_records: self.num_wal_records + other.num_wal_records,
num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests,
first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn,
last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn,
last_record_lsn: self.last_record_lsn + other.last_record_lsn,
}
}
}
// //
// Shared data structure, holding page cache and related auxiliary information // Shared data structure, holding page cache and related auxiliary information
// //
@@ -94,8 +112,31 @@ struct PageCacheShared {
} }
lazy_static! { lazy_static! {
pub static ref PAGECACHE : PageCache = init_page_cache(); pub static ref PAGECACHES : Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
} }
pub fn get_pagecahe(conf: PageServerConf, sys_id : u64) -> Arc<PageCache> {
let mut pcaches = PAGECACHES.lock().unwrap();
if !pcaches.contains_key(&sys_id) {
pcaches.insert(sys_id, Arc::new(init_page_cache()));
// Initialize the WAL redo thread
//
// Now join_handle is not saved any where and we won'try restart tharead
// if it is dead. We may later stop that treads after some inactivity period
// and restart them on demand.
let _walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
walredo::wal_redo_main(conf, sys_id);
})
.unwrap();
}
pcaches.get(&sys_id).unwrap().clone()
}
fn init_page_cache() -> PageCache fn init_page_cache() -> PageCache
{ {
// Initialize the channel between the page cache and the WAL applicator // Initialize the channel between the page cache and the WAL applicator
@@ -208,14 +249,16 @@ pub struct WALRecord {
// Public interface functions // Public interface functions
impl PageCache {
// //
// GetPage@LSN // GetPage@LSN
// //
// Returns an 8k page image // Returns an 8k page image
// //
pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>>
{ {
PAGECACHE.num_getpage_requests.fetch_add(1, Ordering::Relaxed); self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record, // Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records. // ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -224,14 +267,14 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
let entry_rc: Arc<CacheEntry>; let entry_rc: Arc<CacheEntry>;
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
let mut waited = false; let mut waited = false;
while lsn > shared.last_valid_lsn { while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up // TODO: Wait for the WAL receiver to catch up
waited = true; waited = true;
trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); let wait_result = self.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
shared = wait_result.0; shared = wait_result.0;
if wait_result.1.timed_out() { if wait_result.1.timed_out() {
@@ -283,7 +326,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
assert!(!entry_content.apply_pending); assert!(!entry_content.apply_pending);
entry_content.apply_pending = true; entry_content.apply_pending = true;
let s = &PAGECACHE.walredo_sender; let s = &self.walredo_sender;
s.send(entry_rc.clone())?; s.send(entry_rc.clone())?;
} }
@@ -323,10 +366,10 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
// Returns an old page image (if any), and a vector of WAL records to apply // Returns an old page image (if any), and a vector of WAL records to apply
// over it. // over it.
// //
pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>)
{ {
// Scan the BTreeMap backwards, starting from the given entry. // Scan the BTreeMap backwards, starting from the given entry.
let shared = PAGECACHE.shared.lock().unwrap(); let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache; let pagecache = &shared.pagecache;
let minkey = CacheKey { let minkey = CacheKey {
@@ -377,7 +420,7 @@ pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option<Bytes>, Vec<WALR
// //
// Adds a WAL record to the page cache // Adds a WAL record to the page cache
// //
pub fn put_wal_record(tag: BufferTag, rec: WALRecord) pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord)
{ {
let key = CacheKey { let key = CacheKey {
tag: tag, tag: tag,
@@ -387,7 +430,7 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
let entry = CacheEntry::new(key.clone()); let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec); entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag { let rel_tag = RelTag {
spcnode: tag.spcnode, spcnode: tag.spcnode,
@@ -403,19 +446,19 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
trace!("put_wal_record lsn: {}", key.lsn); trace!("put_wal_record lsn: {}", key.lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry)); let oldentry = shared.pagecache.insert(key, Arc::new(entry));
PAGECACHE.num_entries.fetch_add(1, Ordering::Relaxed); self.num_entries.fetch_add(1, Ordering::Relaxed);
if !oldentry.is_none() { if !oldentry.is_none() {
error!("overwriting WAL record in page cache"); error!("overwriting WAL record in page cache");
} }
PAGECACHE.num_wal_records.fetch_add(1, Ordering::Relaxed); self.num_wal_records.fetch_add(1, Ordering::Relaxed);
} }
// //
// Memorize a full image of a page version // Memorize a full image of a page version
// //
pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes) pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes)
{ {
let key = CacheKey { let key = CacheKey {
tag: tag, tag: tag,
@@ -425,39 +468,39 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
let entry = CacheEntry::new(key.clone()); let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img); entry.content.lock().unwrap().page_image = Some(img);
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache; let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry)); let oldentry = pagecache.insert(key, Arc::new(entry));
PAGECACHE.num_entries.fetch_add(1, Ordering::Relaxed); self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none()); assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}", //debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
PAGECACHE.num_page_images.fetch_add(1, Ordering::Relaxed); self.num_page_images.fetch_add(1, Ordering::Relaxed);
} }
// //
pub fn advance_last_valid_lsn(lsn: u64) pub fn advance_last_valid_lsn(&self, lsn: u64)
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
// Can't move backwards. // Can't move backwards.
assert!(lsn >= shared.last_valid_lsn); assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn; shared.last_valid_lsn = lsn;
PAGECACHE.valid_lsn_condvar.notify_all(); self.valid_lsn_condvar.notify_all();
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} }
// //
// NOTE: this updates last_valid_lsn as well. // NOTE: this updates last_valid_lsn as well.
// //
pub fn advance_last_record_lsn(lsn: u64) pub fn advance_last_record_lsn(&self, lsn: u64)
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
// Can't move backwards. // Can't move backwards.
assert!(lsn >= shared.last_valid_lsn); assert!(lsn >= shared.last_valid_lsn);
@@ -465,16 +508,16 @@ pub fn advance_last_record_lsn(lsn: u64)
shared.last_valid_lsn = lsn; shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn; shared.last_record_lsn = lsn;
PAGECACHE.valid_lsn_condvar.notify_all(); self.valid_lsn_condvar.notify_all();
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); self.last_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} }
// //
pub fn _advance_first_valid_lsn(lsn: u64) pub fn _advance_first_valid_lsn(&self, lsn: u64)
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
// Can't move backwards. // Can't move backwards.
assert!(lsn >= shared.first_valid_lsn); assert!(lsn >= shared.first_valid_lsn);
@@ -484,12 +527,12 @@ pub fn _advance_first_valid_lsn(lsn: u64)
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn; shared.first_valid_lsn = lsn;
PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed); self.first_valid_lsn.store(lsn, Ordering::Relaxed);
} }
pub fn init_valid_lsn(lsn: u64) pub fn init_valid_lsn(&self, lsn: u64)
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0); assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0); assert!(shared.last_valid_lsn == 0);
@@ -498,14 +541,15 @@ pub fn init_valid_lsn(lsn: u64)
shared.first_valid_lsn = lsn; shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn; shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn; shared.last_record_lsn = lsn;
PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); self.first_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_record_lsn.store(lsn, Ordering::Relaxed); self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
} }
pub fn get_last_record_lsn() -> u64 pub fn get_last_valid_lsn(&self) -> u64
{ {
let shared = PAGECACHE.shared.lock().unwrap(); let shared = self.shared.lock().unwrap();
return shared.last_record_lsn; return shared.last_record_lsn;
} }
@@ -517,7 +561,7 @@ pub fn get_last_record_lsn() -> u64
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
// //
// //
pub fn _test_get_page_at_lsn() pub fn _test_get_page_at_lsn(&self)
{ {
// for quick testing of the get_page_at_lsn() funcion. // for quick testing of the get_page_at_lsn() funcion.
// //
@@ -527,7 +571,7 @@ pub fn _test_get_page_at_lsn()
let mut tag: Option<BufferTag> = None; let mut tag: Option<BufferTag> = None;
{ {
let shared = PAGECACHE.shared.lock().unwrap(); let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache; let pagecache = &shared.pagecache;
if pagecache.is_empty() { if pagecache.is_empty() {
@@ -548,7 +592,7 @@ pub fn _test_get_page_at_lsn()
} }
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => { Ok(_img) => {
// This prints out the whole page image. // This prints out the whole page image.
//println!("{:X?}", img); //println!("{:X?}", img);
@@ -563,9 +607,9 @@ pub fn _test_get_page_at_lsn()
// FIXME: Shouldn't relation size also be tracked with an LSN? // FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on // If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN. // the replica's current replay LSN.
pub fn relsize_inc(rel: &RelTag, to: Option<u32>) pub fn relsize_inc(&self, rel: &RelTag, to: Option<u32>)
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0); let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to { if let Some(to) = to {
@@ -575,29 +619,51 @@ pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
} }
} }
pub fn relsize_get(rel: &RelTag) -> u32 pub fn relsize_get(&self, rel: &RelTag) -> u32
{ {
let mut shared = PAGECACHE.shared.lock().unwrap(); let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0); let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry *entry
} }
pub fn relsize_exist(rel: &RelTag) -> bool pub fn relsize_exist(&self, rel: &RelTag) -> bool
{ {
let shared = PAGECACHE.shared.lock().unwrap(); let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache; let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel) relsize_cache.contains_key(rel)
} }
pub fn get_stats() -> PageCacheStats pub fn get_stats(&self) -> PageCacheStats
{ {
PageCacheStats { PageCacheStats {
num_entries: PAGECACHE.num_entries.load(Ordering::Relaxed), num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: PAGECACHE.num_page_images.load(Ordering::Relaxed), num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: PAGECACHE.num_wal_records.load(Ordering::Relaxed), num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: PAGECACHE.num_getpage_requests.load(Ordering::Relaxed), num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: PAGECACHE.first_valid_lsn.load(Ordering::Relaxed), first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: PAGECACHE.last_valid_lsn.load(Ordering::Relaxed), last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: PAGECACHE.last_record_lsn.load(Ordering::Relaxed), last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
} }
} }
}
pub fn get_stats() -> PageCacheStats
{
let pcaches = PAGECACHES.lock().unwrap();
let mut stats = PageCacheStats {
num_entries: 0,
num_page_images: 0,
num_wal_records: 0,
num_getpage_requests: 0,
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
};
pcaches.iter().for_each(|(_sys_id, pcache)| {
stats += pcache.get_stats();
});
stats
}

View File

@@ -15,11 +15,13 @@ use tokio::runtime;
use tokio::task; use tokio::task;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use std::io; use std::{io};
use std::thread;
use log::*; use log::*;
use crate::page_cache; use crate::page_cache;
use crate::PageServerConf; use crate::PageServerConf;
use crate::walreceiver;
type Result<T> = std::result::Result<T, io::Error>; type Result<T> = std::result::Result<T, io::Error>;
@@ -221,43 +223,38 @@ pub fn thread_main(conf: PageServerConf) {
info!("Starting page server on {}", conf.listen_addr); info!("Starting page server on {}", conf.listen_addr);
runtime.block_on(async { runtime.block_on(async {
let _unused = page_service_main(conf.listen_addr.to_string().as_str()).await; let listener = TcpListener::bind(conf.listen_addr).await.unwrap();
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
let mut conn_handler = Connection::new(conf.clone(), socket);
task::spawn(async move {
if let Err(err) = conn_handler.run().await {
error!("error: {}", err);
}
});
}
}); });
} }
async fn page_service_main(listen_address: &str) {
let listener = TcpListener::bind(listen_address).await.unwrap();
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
let mut conn_handler = Connection::new(socket);
task::spawn(async move {
if let Err(err) = conn_handler.run().await {
error!("error: {}", err);
}
});
}
}
#[derive(Debug)] #[derive(Debug)]
struct Connection { struct Connection {
stream: BufWriter<TcpStream>, stream: BufWriter<TcpStream>,
buffer: BytesMut, buffer: BytesMut,
init_done: bool init_done: bool,
conf: PageServerConf,
} }
impl Connection { impl Connection {
pub fn new(socket: TcpStream) -> Connection { pub fn new(conf: PageServerConf, socket: TcpStream) -> Connection {
Connection { Connection {
stream: BufWriter::new(socket), stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(10 * 1024), buffer: BytesMut::with_capacity(10 * 1024),
init_done: false init_done: false,
conf: conf
} }
} }
@@ -433,10 +430,40 @@ impl Connection {
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> { async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> {
trace!("got query {:?}", q.body); trace!("got query {:?}", q.body);
if q.body.starts_with(b"pagestream") { if q.body.starts_with(b"controlfile") {
self.handle_pagerequests().await
} else if q.body.starts_with(b"controlfile") {
self.handle_controlfile().await self.handle_controlfile().await
} else if q.body.starts_with(b"pagestream ") {
let (_l,r) = q.body.split_at("pagestream ".len());
let mut r = r.to_vec();
r.pop();
let sysid = String::from_utf8(r).unwrap().trim().to_string();
let sysid: u64 = sysid.parse().unwrap(); // XXX
self.handle_pagerequests(sysid).await
} else if q.body.starts_with(b"callmemaybe ") {
let (_l,r) = q.body.split_at("callmemaybe ".len());
let mut r = r.to_vec();
r.pop();
let connstr = String::from_utf8(r).unwrap().trim().to_string();
let conf_copy = self.conf.clone();
let _walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf_copy,&connstr);
})
.unwrap();
// generick ack:
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else if q.body.starts_with(b"status") { } else if q.body.starts_with(b"status") {
self.write_message_noflush(&BeMessage::RowDescription).await?; self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?; self.write_message_noflush(&BeMessage::DataRow).await?;
@@ -456,10 +483,9 @@ impl Connection {
self.write_message_noflush(&BeMessage::ControlFile).await?; self.write_message_noflush(&BeMessage::ControlFile).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?; self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await self.write_message(&BeMessage::ReadyForQuery).await
} }
async fn handle_pagerequests(&mut self) -> Result<()> { async fn handle_pagerequests(&mut self, sysid: u64) -> Result<()> {
/* switch client to COPYBOTH */ /* switch client to COPYBOTH */
self.stream.write_u8(b'W').await?; self.stream.write_u8(b'W').await?;
@@ -468,13 +494,15 @@ impl Connection {
self.stream.write_i16(0).await?; /* numAttributes */ self.stream.write_i16(0).await?; /* numAttributes */
self.stream.flush().await?; self.stream.flush().await?;
let pcache = page_cache::get_pagecahe(self.conf.clone(), sysid);
loop { loop {
let message = self.read_message().await?; let message = self.read_message().await?;
// XXX: none seems to appear a lot in log. // XXX: none seems to appear a lot in log.
// Do we have conditions for busy-loop here? // Do we have conditions for busy-loop here?
if let Some(m) = &message { if let Some(m) = &message {
info!("query: {:?}", m); info!("query({}): {:?}", sysid, m);
}; };
if message.is_none() { if message.is_none() {
@@ -492,7 +520,7 @@ impl Connection {
forknum: req.forknum, forknum: req.forknum,
}; };
let exist = page_cache::relsize_exist(&tag); let exist = pcache.relsize_exist(&tag);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: exist, ok: exist,
@@ -520,7 +548,7 @@ impl Connection {
forknum: req.forknum, forknum: req.forknum,
}; };
let n_blocks = page_cache::relsize_get(&tag); let n_blocks = pcache.relsize_get(&tag);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true, ok: true,
@@ -535,27 +563,25 @@ impl Connection {
forknum: req.forknum, forknum: req.forknum,
blknum: req.blkno blknum: req.blkno
}; };
let lsn = req.lsn;
let msg; let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
{ Ok(p) => {
let p = page_cache::get_page_at_lsn(buf_tag, lsn); BeMessage::ZenithReadResponse(ZenithReadResponse {
if p.is_ok() {
msg = ZenithReadResponse {
ok: true, ok: true,
n_blocks: 0, n_blocks: 0,
page: p.unwrap() page: p
}; })
} else { },
Err(e) => {
const ZERO_PAGE:[u8; 8192] = [0; 8192]; const ZERO_PAGE:[u8; 8192] = [0; 8192];
msg = ZenithReadResponse { error!("get_page_at_lsn: {}", e);
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: false, ok: false,
n_blocks: 0, n_blocks: 0,
page: Bytes::from_static(&ZERO_PAGE) page: Bytes::from_static(&ZERO_PAGE)
} })
} }
} };
let msg = BeMessage::ZenithReadResponse(msg);
self.write_message(&msg).await? self.write_message(&msg).await?
@@ -568,7 +594,7 @@ impl Connection {
forknum: req.forknum, forknum: req.forknum,
}; };
page_cache::relsize_inc(&tag, None); pcache.relsize_inc(&tag, None);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true, ok: true,
@@ -583,7 +609,7 @@ impl Connection {
forknum: req.forknum, forknum: req.forknum,
}; };
page_cache::relsize_inc(&tag, Some(req.blkno)); pcache.relsize_inc(&tag, Some(req.blkno));
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true, ok: true,

View File

@@ -22,7 +22,7 @@ use tokio::runtime;
use futures::future; use futures::future;
use crate::page_cache; use crate::{PageServerConf, page_cache};
struct Storage { struct Storage {
region: Region, region: Region,
@@ -30,12 +30,12 @@ struct Storage {
bucket: String bucket: String
} }
pub fn restore_main() { pub fn restore_main(conf: &PageServerConf) {
// Create a new thread pool // Create a new thread pool
let runtime = runtime::Runtime::new().unwrap(); let runtime = runtime::Runtime::new().unwrap();
runtime.block_on(async { runtime.block_on(async {
let result = restore_chunk().await; let result = restore_chunk(conf).await;
match result { match result {
Ok(_) => { return; }, Ok(_) => { return; },
@@ -55,7 +55,7 @@ pub fn restore_main() {
// //
// Load it all into the page cache. // Load it all into the page cache.
// //
async fn restore_chunk() -> Result<(), S3Error> { async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
let backend = Storage { let backend = Storage {
region: Region::Custom { region: Region::Custom {
@@ -79,6 +79,8 @@ async fn restore_chunk() -> Result<(), S3Error> {
// List out contents of directory // List out contents of directory
let results: Vec<s3::serde_types::ListBucketResult> = bucket.list("relationdata/".to_string(), Some("".to_string())).await?; let results: Vec<s3::serde_types::ListBucketResult> = bucket.list("relationdata/".to_string(), Some("".to_string())).await?;
// TODO: get that from backup
let sys_id: u64 = 42;
let mut oldest_lsn = 0; let mut oldest_lsn = 0;
let mut slurp_futures: Vec<_> = Vec::new(); let mut slurp_futures: Vec<_> = Vec::new();
@@ -98,7 +100,7 @@ async fn restore_chunk() -> Result<(), S3Error> {
oldest_lsn = p.lsn; oldest_lsn = p.lsn;
} }
let b = bucket.clone(); let b = bucket.clone();
let f = slurp_base_file(b, key.to_string(), p); let f = slurp_base_file(conf, sys_id, b, key.to_string(), p);
slurp_futures.push(f); slurp_futures.push(f);
} }
@@ -110,7 +112,9 @@ async fn restore_chunk() -> Result<(), S3Error> {
if oldest_lsn == 0 { if oldest_lsn == 0 {
panic!("no base backup found"); panic!("no base backup found");
} }
page_cache::init_valid_lsn(oldest_lsn);
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
pcache.init_valid_lsn(oldest_lsn);
info!("{} files to restore...", slurp_futures.len()); info!("{} files to restore...", slurp_futures.len());
@@ -266,7 +270,7 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
// //
// Load a base file from S3, and insert it into the page cache // Load a base file from S3, and insert it into the page cache
// //
async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImageFileName) async fn slurp_base_file(conf: &PageServerConf, sys_id: u64, bucket: Bucket, s3path: String, parsed: ParsedBaseImageFileName)
{ {
// FIXME: rust-s3 opens a new connection for each request. Should reuse // FIXME: rust-s3 opens a new connection for each request. Should reuse
// the reqwest::Client object. But that requires changes to rust-s3 itself. // the reqwest::Client object. But that requires changes to rust-s3 itself.
@@ -280,6 +284,8 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage
// FIXME: use constants (BLCKSZ) // FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024*1024*1024 / 8192); let mut blknum: u32 = parsed.segno * (1024*1024*1024 / 8192);
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
while bytes.remaining() >= 8192 { while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag { let tag = page_cache::BufferTag {
@@ -290,7 +296,7 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage
blknum: blknum blknum: blknum
}; };
page_cache::put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
blknum += 1; blknum += 1;
} }

View File

@@ -22,9 +22,9 @@ use postgres_protocol::message::backend::ReplicationMessage;
// //
// This is the entry point for the WAL receiver thread. // This is the entry point for the WAL receiver thread.
// //
pub fn thread_main(conf: PageServerConf) { pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
info!("WAL receiver thread started"); info!("WAL receiver thread started: '{}'", wal_producer_connstr);
let runtime = runtime::Builder::new_current_thread() let runtime = runtime::Builder::new_current_thread()
.enable_all() .enable_all()
@@ -33,7 +33,7 @@ pub fn thread_main(conf: PageServerConf) {
runtime.block_on( async { runtime.block_on( async {
loop { loop {
let _res = walreceiver_main(&conf).await; let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await;
// TODO: print/log the error // TODO: print/log the error
info!("WAL streaming connection failed, retrying in 1 second...: {:?}", _res); info!("WAL streaming connection failed, retrying in 1 second...: {:?}", _res);
@@ -42,12 +42,12 @@ pub fn thread_main(conf: PageServerConf) {
}); });
} }
async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -> Result<(), Error> {
// Connect to the database in replication mode. // Connect to the database in replication mode.
debug!("connecting to {}...", conf.wal_producer_connstr); debug!("connecting to {}...", wal_producer_connstr);
let (mut rclient, connection) = connect_replication( let (mut rclient, connection) = connect_replication(
conf.wal_producer_connstr.as_str(), wal_producer_connstr.as_str(),
NoTls, NoTls,
ReplicationMode::Physical ReplicationMode::Physical
).await?; ).await?;
@@ -63,13 +63,16 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
let _identify_system = rclient.identify_system().await?; let _identify_system = rclient.identify_system().await?;
let sysid : u64 = _identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecahe(conf, sysid);
// //
// Start streaming the WAL, from where we left off previously. // Start streaming the WAL, from where we left off previously.
// //
let mut startpoint = page_cache::get_last_record_lsn(); let mut startpoint = pcache.get_last_valid_lsn();
if startpoint == 0 { if startpoint == 0 {
// FIXME: Or should we just error out? // FIXME: Or should we just error out?
page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos())); pcache.init_valid_lsn(u64::from(_identify_system.xlogpos()));
startpoint = u64::from(_identify_system.xlogpos()); startpoint = u64::from(_identify_system.xlogpos());
} else { } else {
// There might be some padding after the last full record, skip it. // There might be some padding after the last full record, skip it.
@@ -126,12 +129,12 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
rec: recdata.clone() rec: recdata.clone()
}; };
page_cache::put_wal_record(tag, rec); pcache.put_wal_record(tag, rec);
} }
// Now that this record has been handled, let the page cache know that // Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN // it is up-to-date to this LSN
page_cache::advance_last_record_lsn(lsn); pcache.advance_last_valid_lsn(lsn);
} else { } else {
break; break;
@@ -144,7 +147,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
// better reflect that, because GetPage@LSN requests might also point in the // better reflect that, because GetPage@LSN requests might also point in the
// middle of a record, if the request LSN was taken from the server's current // middle of a record, if the request LSN was taken from the server's current
// flush ptr. // flush ptr.
page_cache::advance_last_valid_lsn(endlsn); pcache.advance_last_valid_lsn(endlsn);
} }
ReplicationMessage::PrimaryKeepAlive(_keepalive) => { ReplicationMessage::PrimaryKeepAlive(_keepalive) => {

View File

@@ -16,7 +16,7 @@
// //
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::process::{Command, Child, ChildStdin, ChildStdout}; use tokio::process::{Command, Child, ChildStdin, ChildStdout};
use std::process::Stdio; use std::{path::PathBuf, process::Stdio};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt; use tokio::io::AsyncBufReadExt;
use tokio::time::timeout; use tokio::time::timeout;
@@ -24,13 +24,14 @@ use std::io::Error;
use std::cell::RefCell; use std::cell::RefCell;
use std::assert; use std::assert;
use std::sync::{Arc}; use std::sync::{Arc};
use std::fs;
use log::*; use log::*;
use std::time::Instant; use std::time::Instant;
use std::time::Duration; use std::time::Duration;
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut, BufMut};
use crate::page_cache::BufferTag; use crate::{PageServerConf, page_cache::BufferTag};
use crate::page_cache::CacheEntry; use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord; use crate::page_cache::WALRecord;
use crate::page_cache; use crate::page_cache;
@@ -40,25 +41,28 @@ static TIMEOUT: Duration = Duration::from_secs(20);
// //
// Main entry point for the WAL applicator thread. // Main entry point for the WAL applicator thread.
// //
pub fn wal_applicator_main() pub fn wal_redo_main(conf: PageServerConf, sys_id: u64)
{ {
info!("WAL redo thread started"); info!("WAL redo thread started {}", sys_id);
// We block on waiting for requests on the walredo request channel, but // We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the // use async I/O to communicate with the child process. Initialize the
// runtime for the async part. // runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
// Loop forever, handling requests as they come. // Loop forever, handling requests as they come.
let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver; let walredo_channel_receiver = &pcache.walredo_receiver;
loop { loop {
let mut process: WalRedoProcess; let mut process: WalRedoProcess;
let datadir = conf.data_dir.join(format!("wal-redo/{}", sys_id));
info!("launching WAL redo postgres process"); info!("launching WAL redo postgres process {}", sys_id);
{ {
let _guard = runtime.enter(); let _guard = runtime.enter();
process = WalRedoProcess::launch().unwrap(); process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
} }
// Pretty arbitrarily, reuse the same Postgres process for 100 requests. // Pretty arbitrarily, reuse the same Postgres process for 100 requests.
@@ -66,10 +70,9 @@ pub fn wal_applicator_main()
// using up all shared buffers in Postgres's shared buffer cache; we don't // using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process. // want to write any pages to disk in the WAL redo process.
for _i in 1..100 { for _i in 1..100 {
let request = walredo_channel_receiver.recv().unwrap(); let request = walredo_channel_receiver.recv().unwrap();
let result = handle_apply_request(&process, &runtime, request); let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() { if result.is_err() {
// On error, kill the process. // On error, kill the process.
break; break;
@@ -84,11 +87,11 @@ pub fn wal_applicator_main()
} }
} }
fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error> fn handle_apply_request(pcache: &page_cache::PageCache, process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error>
{ {
let tag = entry_rc.key.tag; let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn; let lsn = entry_rc.key.lsn;
let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref()); let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap(); let mut entry = entry_rc.content.lock().unwrap();
entry.apply_pending = false; entry.apply_pending = false;
@@ -110,7 +113,7 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A
result = Err(e); result = Err(e);
} else { } else {
entry.page_image = Some(apply_result.unwrap()); entry.page_image = Some(apply_result.unwrap());
page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed); pcache.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
result = Ok(()); result = Ok(());
} }
@@ -128,19 +131,39 @@ struct WalRedoProcess {
impl WalRedoProcess { impl WalRedoProcess {
fn launch() -> Result<WalRedoProcess, Error> { //
// // Start postgres binary in special WAL redo mode.
// Start postgres binary in special WAL redo mode. //
// // Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
fn launch(datadir: &PathBuf, runtime: &Runtime) -> Result<WalRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir.to_str().unwrap()).ok();
let initdb = runtime.block_on(Command::new("initdb")
.args(&["-D", datadir.to_str().unwrap()])
.arg("-N")
.status()
).expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
// Start postgres itself
let mut child = let mut child =
Command::new("postgres") Command::new("postgres")
.arg("--wal-redo") .arg("--wal-redo")
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.env("PGDATA", datadir.to_str().unwrap())
.spawn() .spawn()
.expect("postgres --wal-redo command failed to start"); .expect("postgres --wal-redo command failed to start");
info!("launched WAL redo postgres process on {}", datadir.to_str().unwrap());
let stdin = child.stdin.take().expect("failed to open child's stdin"); let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr"); let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout"); let stdout = child.stdout.take().expect("failed to open child's stdout");
@@ -159,7 +182,7 @@ impl WalRedoProcess {
if res.unwrap() == 0 { if res.unwrap() == 0 {
break; break;
} }
debug!("{}", line.trim()); debug!("wal-redo-postgres: {}", line.trim());
line.clear(); line.clear();
} }
Ok::<(), Error>(()) Ok::<(), Error>(())

View File

@@ -1,21 +1,6 @@
use pageserver::control_plane::ComputeControlPlane;
#[test] #[test]
fn test_actions() { fn test_actions() {
let mut cplane = ComputeControlPlane::local();
let node = cplane.new_vanilla_node();
node.start();
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
let count: i64 = node
.safe_psql("postgres", "SELECT count(*) FROM t")
.first()
.unwrap()
.get(0);
assert_eq!(count, 100000);
} }
#[test] #[test]

View File

@@ -11,19 +11,9 @@ use pageserver::control_plane::StorageControlPlane;
// Handcrafted cases with wal records that are (were) problematic for redo. // Handcrafted cases with wal records that are (were) problematic for redo.
#[test] #[test]
fn test_redo_cases() { fn test_redo_cases() {
// Allocate postgres instance, but don't start
let mut compute_cplane = ComputeControlPlane::local();
// Create compute node without files, only datadir structure
let node = compute_cplane.new_minimal_node();
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server(node.connstr()); let storage_cplane = StorageControlPlane::one_page_server();
let pageserver_addr = storage_cplane.page_server_addr(); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pageserver_addr.ip(), pageserver_addr.port()).as_str());
// Request info needed to build control file // Request info needed to build control file
storage_cplane.simple_query_storage("postgres", node.whoami().as_str(), "controlfile"); storage_cplane.simple_query_storage("postgres", node.whoami().as_str(), "controlfile");
@@ -31,7 +21,8 @@ fn test_redo_cases() {
node.setup_controlfile(); node.setup_controlfile();
// start postgres // start postgres
node.start(); let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection..."); println!("await pageserver connection...");
sleep(Duration::from_secs(3)); sleep(Duration::from_secs(3));
@@ -61,30 +52,57 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node // Runs pg_regress on a compute node
#[test] #[test]
fn test_regress() { fn test_regress() {
// Allocate postgres instance, but don't start
let mut compute_cplane = ComputeControlPlane::local();
let node = compute_cplane.new_vanilla_node();
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server(node.connstr()); let storage_cplane = StorageControlPlane::one_page_server();
let pageserver_addr = storage_cplane.page_server_addr(); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pageserver_addr.ip(), pageserver_addr.port()).as_str());
// start postgres // start postgres
node.start(); let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection..."); println!("await pageserver connection...");
sleep(Duration::from_secs(3)); sleep(Duration::from_secs(3));
////////////////////////////////////////////////////////////////// pageserver::control_plane::regress_check(&node);
pageserver::control_plane::regress_check(node);
} }
// Runs recovery with minio // Run two postgres instances on one pageserver
#[test] #[test]
fn test_pageserver_recovery() {} fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Allocate postgres instance, but don't start
let node1 = compute_cplane.new_node();
let node2 = compute_cplane.new_node();
node1.start(&storage_cplane);
node2.start(&storage_cplane);
// XXX: add some extension func to postgres to check walsender conn
// XXX: or better just drop that
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
// check node1
node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
let count: i64 = node1
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
// check node2
node2.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node2.safe_psql("postgres", "INSERT INTO t SELECT generate_series(100000,200000), 'payload'");
let count: i64 = node2
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 15000150000);
}