Support several postgres instances on top of a single pageserver.

Each postgres will use its own page cache with associated data
structures. Postgres system_id is used to distinguish instances.
That also means that backup should have valid system_id stashed
somewhere. For now I put '42' as sys_id during S3 restore, but
that ought to be fixed.

Also this commit introduces new way of starting WAL receivers:
postgres can initiate such connection by calling 'callmemaybe $url'
command in the page_service -- that will start appropriate wal-redo
and wal-receiver threads. This way page server may start without
a priori knowledge of compute node addreses.
This commit is contained in:
Stas Kelvich
2021-04-02 21:08:34 +03:00
parent 6eabe17e98
commit 2c308da4d2
12 changed files with 393 additions and 243 deletions

View File

@@ -27,4 +27,4 @@ jobs:
- name: Run test
run: |
cargo test --test test_pageserver -- --nocapture --test-threads=1
cargo test --test test_pageserver -- --nocapture --test-threads=1

View File

@@ -7,6 +7,7 @@ use std::{fs::File, str::FromStr};
use std::io;
use std::path::PathBuf;
use std::thread;
use std::fs;
use clap::{App, Arg};
use daemonize::Daemonize;
@@ -20,7 +21,6 @@ use pageserver::page_service;
use pageserver::restore_s3;
use pageserver::tui;
use pageserver::walreceiver;
use pageserver::walredo;
use pageserver::PageServerConf;
fn main() -> Result<(), io::Error> {
@@ -61,7 +61,7 @@ fn main() -> Result<(), io::Error> {
data_dir: PathBuf::from("./"),
daemonize: false,
interactive: false,
wal_producer_connstr: String::from_str("host=127.0.0.1 port=65432 user=zenith").unwrap(),
wal_producer_connstr: None,
listen_addr: "127.0.0.1:5430".parse().unwrap(),
skip_recovery: false,
};
@@ -90,7 +90,7 @@ fn main() -> Result<(), io::Error> {
}
if let Some(addr) = arg_matches.value_of("wal_producer") {
conf.wal_producer_connstr = String::from_str(addr).unwrap();
conf.wal_producer_connstr = Some(String::from_str(addr).unwrap());
}
if let Some(addr) = arg_matches.value_of("listen") {
@@ -148,43 +148,51 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
info!("starting...");
// Initialize the WAL applicator
let walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(|| {
walredo::wal_applicator_main();
})
.unwrap();
threads.push(walredo_thread);
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
if !conf.skip_recovery {
restore_s3::restore_main();
restore_s3::restore_main(&conf);
}
// Launch the WAL receiver thread. It will try to connect to the WAL safekeeper,
// and stream the WAL. If the connection is lost, it will reconnect on its own.
// We just fire and forget it here.
let conf1 = conf.clone();
let walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(|| {
// thread code
walreceiver::thread_main(conf1);
})
.unwrap();
threads.push(walreceiver_thread);
// Create directory for wal-redo datadirs
match fs::create_dir(conf.data_dir.join("wal-redo")) {
Ok(_) => {},
Err(e) => match e.kind() {
io::ErrorKind::AlreadyExists => {}
_ => {
panic!("Failed to create wal-redo data directory: {}", e);
}
}
}
// Launch the WAL receiver thread if pageserver was started with --wal-producer
// option. It will try to connect to the WAL safekeeper, and stream the WAL. If
// the connection is lost, it will reconnect on its own. We just fire and forget
// it here.
//
// All other wal receivers are started on demand by "callmemaybe" command
// sent to pageserver.
let conf_copy = conf.clone();
if let Some(wal_producer) = conf.wal_producer_connstr {
let conf = conf_copy.clone();
let walreceiver_thread = thread::Builder::new()
.name("static WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf, &wal_producer);
})
.unwrap();
threads.push(walreceiver_thread);
}
// GetPage@LSN requests are served by another thread. (It uses async I/O,
// but the code in page_service sets up it own thread pool for that)
let conf2 = conf.clone();
let conf = conf_copy.clone();
let page_server_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(|| {
// thread code
page_service::thread_main(conf2);
page_service::thread_main(conf);
})
.unwrap();
threads.push(page_server_thread);

View File

@@ -7,7 +7,7 @@
// local installations.
//
use std::fs::{self, OpenOptions};
use std::{fs::{self, OpenOptions}, rc::Rc};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str;
@@ -45,7 +45,7 @@ pub struct StorageControlPlane {
impl StorageControlPlane {
// postgres <-> page_server
pub fn one_page_server(pg_connstr: String) -> StorageControlPlane {
pub fn one_page_server() -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
@@ -53,7 +53,6 @@ impl StorageControlPlane {
let pserver = PageServerNode {
page_service_addr: "127.0.0.1:65200".parse().unwrap(),
wal_producer_connstr: pg_connstr,
data_dir: TEST_WORKDIR.join("pageserver")
};
pserver.init();
@@ -73,26 +72,25 @@ impl StorageControlPlane {
fn get_wal_acceptor_conn_info() {}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let addr = &self.page_servers[0].page_service_addr;
pub fn simple_query_storage(&self, db: &str, user: &str, sql: &str) -> Vec<tokio_postgres::SimpleQueryMessage> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.page_server_addr().ip(),
self.page_server_addr().port(),
db,
user
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap()
}
}
pub struct PageServerNode {
page_service_addr: SocketAddr,
wal_producer_connstr: String,
data_dir: PathBuf,
}
@@ -119,17 +117,15 @@ impl PageServerNode {
}
pub fn start(&self) {
println!("Starting pageserver at '{}', wal_producer='{}'", self.page_service_addr, self.wal_producer_connstr);
println!("Starting pageserver at '{}'", self.page_service_addr);
let status = Command::new(CARGO_BIN_DIR.join("pageserver"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-w", self.wal_producer_connstr.as_str()])
.args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
.env("PGDATA", self.data_dir.join("wal_redo_pgdata")) // postres-wal-redo pgdata
.status()
.expect("failed to start pageserver");
@@ -172,19 +168,22 @@ impl WalAcceptorNode {}
//
// ComputeControlPlane
//
pub struct ComputeControlPlane {
pub struct ComputeControlPlane<'a> {
pg_bin_dir: PathBuf,
work_dir: PathBuf,
last_assigned_port: u16,
nodes: Vec<PostgresNode>,
storage_cplane: &'a StorageControlPlane,
nodes: Vec<Rc<PostgresNode>>,
}
impl ComputeControlPlane {
pub fn local() -> ComputeControlPlane {
impl ComputeControlPlane<'_> {
pub fn local(storage_cplane : &StorageControlPlane) -> ComputeControlPlane {
ComputeControlPlane {
pg_bin_dir: PG_BIN_DIR.to_path_buf(),
work_dir: TEST_WORKDIR.to_path_buf(),
last_assigned_port: 65431,
storage_cplane: storage_cplane,
nodes: Vec::new(),
}
}
@@ -196,7 +195,7 @@ impl ComputeControlPlane {
port
}
pub fn new_vanilla_node(&mut self) -> &PostgresNode {
pub fn new_vanilla_node<'a>(&mut self) -> &Rc<PostgresNode> {
// allocate new node entry with generated port
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
@@ -206,7 +205,7 @@ impl ComputeControlPlane {
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
};
self.nodes.push(node);
self.nodes.push(Rc::new(node));
let node = self.nodes.last().unwrap();
// initialize data directory
@@ -260,7 +259,7 @@ impl ComputeControlPlane {
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
};
self.nodes.push(node);
self.nodes.push(Rc::new(node));
let node = self.nodes.last().unwrap();
// initialize data directory w/o files
@@ -297,6 +296,20 @@ impl ComputeControlPlane {
node
}
pub fn new_node(&mut self) -> Rc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let node = self.new_vanilla_node();
let pserver = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pserver.ip(), pserver.port()).as_str());
node.clone()
}
}
///////////////////////////////////////////////////////////////////////////////
@@ -339,8 +352,11 @@ impl PostgresNode {
}
}
pub fn start(&self) {
println!("Started postgres node at '{}'", self.connstr());
pub fn start(&self, storage_cplane: &StorageControlPlane) {
let _res = storage_cplane
.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl("start", true);
}
@@ -349,11 +365,11 @@ impl PostgresNode {
}
pub fn stop(&self) {
self.pg_ctl("stop", true);
self.pg_ctl("-m immediate stop", true);
}
pub fn connstr(&self) -> String {
format!("user={} host={} port={}", self.whoami(), self.ip, self.port)
format!("host={} port={} user={}", self.ip, self.port, self.whoami())
}
// XXX: cache that in control plane
@@ -370,7 +386,6 @@ impl PostgresNode {
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
// XXX: user!
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,

View File

@@ -19,18 +19,18 @@ pub mod xlog_utils;
mod tui_logger;
#[allow(dead_code)]
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct PageServerConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub interactive: bool,
pub wal_producer_connstr: String,
pub wal_producer_connstr: Option<String>,
pub listen_addr: SocketAddr,
pub skip_recovery: bool,
}
#[allow(dead_code)]
#[derive(Debug,Clone)]
#[derive(Debug, Clone)]
pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub no_sync: bool,

View File

@@ -7,25 +7,27 @@
//
use core::ops::Bound::Included;
use std::convert::TryInto;
use std::{convert::TryInto, ops::AddAssign};
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::{Arc,Condvar, Mutex};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::thread;
// use tokio::sync::RwLock;
use bytes::Bytes;
use lazy_static::lazy_static;
use rand::Rng;
use log::*;
use crate::{PageServerConf, walredo};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Sender, Receiver};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(20);
static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache {
shared: Mutex<PageCacheShared>,
@@ -49,6 +51,7 @@ pub struct PageCache {
pub last_record_lsn: AtomicU64,
}
#[derive(Clone)]
pub struct PageCacheStats {
pub num_entries: u64,
pub num_page_images: u64,
@@ -59,6 +62,21 @@ pub struct PageCacheStats {
pub last_record_lsn: u64,
}
impl AddAssign for PageCacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
num_page_images: self.num_page_images + other.num_page_images,
num_wal_records: self.num_wal_records + other.num_wal_records,
num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests,
first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn,
last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn,
last_record_lsn: self.last_record_lsn + other.last_record_lsn,
}
}
}
//
// Shared data structure, holding page cache and related auxiliary information
//
@@ -94,8 +112,31 @@ struct PageCacheShared {
}
lazy_static! {
pub static ref PAGECACHE : PageCache = init_page_cache();
pub static ref PAGECACHES : Mutex<HashMap<u64, Arc<PageCache>>> = Mutex::new(HashMap::new());
}
pub fn get_pagecahe(conf: PageServerConf, sys_id : u64) -> Arc<PageCache> {
let mut pcaches = PAGECACHES.lock().unwrap();
if !pcaches.contains_key(&sys_id) {
pcaches.insert(sys_id, Arc::new(init_page_cache()));
// Initialize the WAL redo thread
//
// Now join_handle is not saved any where and we won'try restart tharead
// if it is dead. We may later stop that treads after some inactivity period
// and restart them on demand.
let _walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
walredo::wal_redo_main(conf, sys_id);
})
.unwrap();
}
pcaches.get(&sys_id).unwrap().clone()
}
fn init_page_cache() -> PageCache
{
// Initialize the channel between the page cache and the WAL applicator
@@ -208,14 +249,16 @@ pub struct WALRecord {
// Public interface functions
impl PageCache {
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>>
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>>
{
PAGECACHE.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -224,14 +267,14 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
let entry_rc: Arc<CacheEntry>;
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
while lsn > shared.last_valid_lsn {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
let wait_result = self.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
shared = wait_result.0;
if wait_result.1.timed_out() {
@@ -283,7 +326,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
assert!(!entry_content.apply_pending);
entry_content.apply_pending = true;
let s = &PAGECACHE.walredo_sender;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
}
@@ -323,10 +366,10 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>)
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>)
{
// Scan the BTreeMap backwards, starting from the given entry.
let shared = PAGECACHE.shared.lock().unwrap();
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
let minkey = CacheKey {
@@ -377,7 +420,7 @@ pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option<Bytes>, Vec<WALR
//
// Adds a WAL record to the page cache
//
pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord)
{
let key = CacheKey {
tag: tag,
@@ -387,7 +430,7 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
let rel_tag = RelTag {
spcnode: tag.spcnode,
@@ -403,19 +446,19 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord)
trace!("put_wal_record lsn: {}", key.lsn);
let oldentry = shared.pagecache.insert(key, Arc::new(entry));
PAGECACHE.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_entries.fetch_add(1, Ordering::Relaxed);
if !oldentry.is_none() {
error!("overwriting WAL record in page cache");
}
PAGECACHE.num_wal_records.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Memorize a full image of a page version
//
pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes)
{
let key = CacheKey {
tag: tag,
@@ -425,39 +468,39 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes)
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().page_image = Some(img);
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
let pagecache = &mut shared.pagecache;
let oldentry = pagecache.insert(key, Arc::new(entry));
PAGECACHE.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
PAGECACHE.num_page_images.fetch_add(1, Ordering::Relaxed);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
//
pub fn advance_last_valid_lsn(lsn: u64)
pub fn advance_last_valid_lsn(&self, lsn: u64)
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
shared.last_valid_lsn = lsn;
PAGECACHE.valid_lsn_condvar.notify_all();
self.valid_lsn_condvar.notify_all();
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
// NOTE: this updates last_valid_lsn as well.
//
pub fn advance_last_record_lsn(lsn: u64)
pub fn advance_last_record_lsn(&self, lsn: u64)
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
@@ -465,16 +508,16 @@ pub fn advance_last_record_lsn(lsn: u64)
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
PAGECACHE.valid_lsn_condvar.notify_all();
self.valid_lsn_condvar.notify_all();
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
}
//
pub fn _advance_first_valid_lsn(lsn: u64)
pub fn _advance_first_valid_lsn(&self, lsn: u64)
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
@@ -484,12 +527,12 @@ pub fn _advance_first_valid_lsn(lsn: u64)
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
}
pub fn init_valid_lsn(lsn: u64)
pub fn init_valid_lsn(&self, lsn: u64)
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
@@ -498,14 +541,15 @@ pub fn init_valid_lsn(lsn: u64)
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
PAGECACHE.last_record_lsn.store(lsn, Ordering::Relaxed);
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
pub fn get_last_record_lsn() -> u64
pub fn get_last_valid_lsn(&self) -> u64
{
let shared = PAGECACHE.shared.lock().unwrap();
let shared = self.shared.lock().unwrap();
return shared.last_record_lsn;
}
@@ -517,7 +561,7 @@ pub fn get_last_record_lsn() -> u64
// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version)
//
//
pub fn _test_get_page_at_lsn()
pub fn _test_get_page_at_lsn(&self)
{
// for quick testing of the get_page_at_lsn() funcion.
//
@@ -527,7 +571,7 @@ pub fn _test_get_page_at_lsn()
let mut tag: Option<BufferTag> = None;
{
let shared = PAGECACHE.shared.lock().unwrap();
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
if pagecache.is_empty() {
@@ -548,7 +592,7 @@ pub fn _test_get_page_at_lsn()
}
info!("testing GetPage@LSN for block {}", tag.unwrap().blknum);
match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) {
Ok(_img) => {
// This prints out the whole page image.
//println!("{:X?}", img);
@@ -563,9 +607,9 @@ pub fn _test_get_page_at_lsn()
// FIXME: Shouldn't relation size also be tracked with an LSN?
// If a replica is lagging behind, it needs to get the size as it was on
// the replica's current replay LSN.
pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
pub fn relsize_inc(&self, rel: &RelTag, to: Option<u32>)
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
if let Some(to) = to {
@@ -575,29 +619,51 @@ pub fn relsize_inc(rel: &RelTag, to: Option<u32>)
}
}
pub fn relsize_get(rel: &RelTag) -> u32
pub fn relsize_get(&self, rel: &RelTag) -> u32
{
let mut shared = PAGECACHE.shared.lock().unwrap();
let mut shared = self.shared.lock().unwrap();
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
*entry
}
pub fn relsize_exist(rel: &RelTag) -> bool
pub fn relsize_exist(&self, rel: &RelTag) -> bool
{
let shared = PAGECACHE.shared.lock().unwrap();
let shared = self.shared.lock().unwrap();
let relsize_cache = &shared.relsize_cache;
relsize_cache.contains_key(rel)
}
pub fn get_stats() -> PageCacheStats
pub fn get_stats(&self) -> PageCacheStats
{
PageCacheStats {
num_entries: PAGECACHE.num_entries.load(Ordering::Relaxed),
num_page_images: PAGECACHE.num_page_images.load(Ordering::Relaxed),
num_wal_records: PAGECACHE.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: PAGECACHE.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: PAGECACHE.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: PAGECACHE.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: PAGECACHE.last_record_lsn.load(Ordering::Relaxed),
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
}
pub fn get_stats() -> PageCacheStats
{
let pcaches = PAGECACHES.lock().unwrap();
let mut stats = PageCacheStats {
num_entries: 0,
num_page_images: 0,
num_wal_records: 0,
num_getpage_requests: 0,
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
};
pcaches.iter().for_each(|(_sys_id, pcache)| {
stats += pcache.get_stats();
});
stats
}

View File

@@ -15,11 +15,13 @@ use tokio::runtime;
use tokio::task;
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, Bytes, BytesMut};
use std::io;
use std::{io};
use std::thread;
use log::*;
use crate::page_cache;
use crate::PageServerConf;
use crate::walreceiver;
type Result<T> = std::result::Result<T, io::Error>;
@@ -221,43 +223,38 @@ pub fn thread_main(conf: PageServerConf) {
info!("Starting page server on {}", conf.listen_addr);
runtime.block_on(async {
let _unused = page_service_main(conf.listen_addr.to_string().as_str()).await;
let listener = TcpListener::bind(conf.listen_addr).await.unwrap();
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
let mut conn_handler = Connection::new(conf.clone(), socket);
task::spawn(async move {
if let Err(err) = conn_handler.run().await {
error!("error: {}", err);
}
});
}
});
}
async fn page_service_main(listen_address: &str) {
let listener = TcpListener::bind(listen_address).await.unwrap();
loop {
let (socket, peer_addr) = listener.accept().await.unwrap();
debug!("accepted connection from {}", peer_addr);
let mut conn_handler = Connection::new(socket);
task::spawn(async move {
if let Err(err) = conn_handler.run().await {
error!("error: {}", err);
}
});
}
}
#[derive(Debug)]
struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
init_done: bool
init_done: bool,
conf: PageServerConf,
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
pub fn new(conf: PageServerConf, socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(10 * 1024),
init_done: false
init_done: false,
conf: conf
}
}
@@ -433,10 +430,40 @@ impl Connection {
async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> {
trace!("got query {:?}", q.body);
if q.body.starts_with(b"pagestream") {
self.handle_pagerequests().await
} else if q.body.starts_with(b"controlfile") {
if q.body.starts_with(b"controlfile") {
self.handle_controlfile().await
} else if q.body.starts_with(b"pagestream ") {
let (_l,r) = q.body.split_at("pagestream ".len());
let mut r = r.to_vec();
r.pop();
let sysid = String::from_utf8(r).unwrap().trim().to_string();
let sysid: u64 = sysid.parse().unwrap(); // XXX
self.handle_pagerequests(sysid).await
} else if q.body.starts_with(b"callmemaybe ") {
let (_l,r) = q.body.split_at("callmemaybe ".len());
let mut r = r.to_vec();
r.pop();
let connstr = String::from_utf8(r).unwrap().trim().to_string();
let conf_copy = self.conf.clone();
let _walreceiver_thread = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
walreceiver::thread_main(conf_copy,&connstr);
})
.unwrap();
// generick ack:
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await
} else if q.body.starts_with(b"status") {
self.write_message_noflush(&BeMessage::RowDescription).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
@@ -456,10 +483,9 @@ impl Connection {
self.write_message_noflush(&BeMessage::ControlFile).await?;
self.write_message_noflush(&BeMessage::CommandComplete).await?;
self.write_message(&BeMessage::ReadyForQuery).await
}
async fn handle_pagerequests(&mut self) -> Result<()> {
async fn handle_pagerequests(&mut self, sysid: u64) -> Result<()> {
/* switch client to COPYBOTH */
self.stream.write_u8(b'W').await?;
@@ -468,13 +494,15 @@ impl Connection {
self.stream.write_i16(0).await?; /* numAttributes */
self.stream.flush().await?;
let pcache = page_cache::get_pagecahe(self.conf.clone(), sysid);
loop {
let message = self.read_message().await?;
// XXX: none seems to appear a lot in log.
// Do we have conditions for busy-loop here?
if let Some(m) = &message {
info!("query: {:?}", m);
info!("query({}): {:?}", sysid, m);
};
if message.is_none() {
@@ -492,7 +520,7 @@ impl Connection {
forknum: req.forknum,
};
let exist = page_cache::relsize_exist(&tag);
let exist = pcache.relsize_exist(&tag);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: exist,
@@ -520,7 +548,7 @@ impl Connection {
forknum: req.forknum,
};
let n_blocks = page_cache::relsize_get(&tag);
let n_blocks = pcache.relsize_get(&tag);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
@@ -535,27 +563,25 @@ impl Connection {
forknum: req.forknum,
blknum: req.blkno
};
let lsn = req.lsn;
let msg;
{
let p = page_cache::get_page_at_lsn(buf_tag, lsn);
if p.is_ok() {
msg = ZenithReadResponse {
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => {
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: p.unwrap()
};
} else {
page: p
})
},
Err(e) => {
const ZERO_PAGE:[u8; 8192] = [0; 8192];
msg = ZenithReadResponse {
error!("get_page_at_lsn: {}", e);
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: false,
n_blocks: 0,
page: Bytes::from_static(&ZERO_PAGE)
}
})
}
}
let msg = BeMessage::ZenithReadResponse(msg);
};
self.write_message(&msg).await?
@@ -568,7 +594,7 @@ impl Connection {
forknum: req.forknum,
};
page_cache::relsize_inc(&tag, None);
pcache.relsize_inc(&tag, None);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,
@@ -583,7 +609,7 @@ impl Connection {
forknum: req.forknum,
};
page_cache::relsize_inc(&tag, Some(req.blkno));
pcache.relsize_inc(&tag, Some(req.blkno));
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
ok: true,

View File

@@ -22,7 +22,7 @@ use tokio::runtime;
use futures::future;
use crate::page_cache;
use crate::{PageServerConf, page_cache};
struct Storage {
region: Region,
@@ -30,12 +30,12 @@ struct Storage {
bucket: String
}
pub fn restore_main() {
pub fn restore_main(conf: &PageServerConf) {
// Create a new thread pool
let runtime = runtime::Runtime::new().unwrap();
runtime.block_on(async {
let result = restore_chunk().await;
let result = restore_chunk(conf).await;
match result {
Ok(_) => { return; },
@@ -55,7 +55,7 @@ pub fn restore_main() {
//
// Load it all into the page cache.
//
async fn restore_chunk() -> Result<(), S3Error> {
async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
let backend = Storage {
region: Region::Custom {
@@ -79,6 +79,8 @@ async fn restore_chunk() -> Result<(), S3Error> {
// List out contents of directory
let results: Vec<s3::serde_types::ListBucketResult> = bucket.list("relationdata/".to_string(), Some("".to_string())).await?;
// TODO: get that from backup
let sys_id: u64 = 42;
let mut oldest_lsn = 0;
let mut slurp_futures: Vec<_> = Vec::new();
@@ -98,7 +100,7 @@ async fn restore_chunk() -> Result<(), S3Error> {
oldest_lsn = p.lsn;
}
let b = bucket.clone();
let f = slurp_base_file(b, key.to_string(), p);
let f = slurp_base_file(conf, sys_id, b, key.to_string(), p);
slurp_futures.push(f);
}
@@ -110,7 +112,9 @@ async fn restore_chunk() -> Result<(), S3Error> {
if oldest_lsn == 0 {
panic!("no base backup found");
}
page_cache::init_valid_lsn(oldest_lsn);
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
pcache.init_valid_lsn(oldest_lsn);
info!("{} files to restore...", slurp_futures.len());
@@ -266,7 +270,7 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
//
// Load a base file from S3, and insert it into the page cache
//
async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImageFileName)
async fn slurp_base_file(conf: &PageServerConf, sys_id: u64, bucket: Bucket, s3path: String, parsed: ParsedBaseImageFileName)
{
// FIXME: rust-s3 opens a new connection for each request. Should reuse
// the reqwest::Client object. But that requires changes to rust-s3 itself.
@@ -280,6 +284,8 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024*1024*1024 / 8192);
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
@@ -290,7 +296,7 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage
blknum: blknum
};
page_cache::put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
blknum += 1;
}

View File

@@ -22,9 +22,9 @@ use postgres_protocol::message::backend::ReplicationMessage;
//
// This is the entry point for the WAL receiver thread.
//
pub fn thread_main(conf: PageServerConf) {
pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) {
info!("WAL receiver thread started");
info!("WAL receiver thread started: '{}'", wal_producer_connstr);
let runtime = runtime::Builder::new_current_thread()
.enable_all()
@@ -33,7 +33,7 @@ pub fn thread_main(conf: PageServerConf) {
runtime.block_on( async {
loop {
let _res = walreceiver_main(&conf).await;
let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await;
// TODO: print/log the error
info!("WAL streaming connection failed, retrying in 1 second...: {:?}", _res);
@@ -42,12 +42,12 @@ pub fn thread_main(conf: PageServerConf) {
});
}
async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -> Result<(), Error> {
// Connect to the database in replication mode.
debug!("connecting to {}...", conf.wal_producer_connstr);
debug!("connecting to {}...", wal_producer_connstr);
let (mut rclient, connection) = connect_replication(
conf.wal_producer_connstr.as_str(),
wal_producer_connstr.as_str(),
NoTls,
ReplicationMode::Physical
).await?;
@@ -63,13 +63,16 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
let _identify_system = rclient.identify_system().await?;
let sysid : u64 = _identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecahe(conf, sysid);
//
// Start streaming the WAL, from where we left off previously.
//
let mut startpoint = page_cache::get_last_record_lsn();
let mut startpoint = pcache.get_last_valid_lsn();
if startpoint == 0 {
// FIXME: Or should we just error out?
page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos()));
pcache.init_valid_lsn(u64::from(_identify_system.xlogpos()));
startpoint = u64::from(_identify_system.xlogpos());
} else {
// There might be some padding after the last full record, skip it.
@@ -126,12 +129,12 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
rec: recdata.clone()
};
page_cache::put_wal_record(tag, rec);
pcache.put_wal_record(tag, rec);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
page_cache::advance_last_record_lsn(lsn);
pcache.advance_last_valid_lsn(lsn);
} else {
break;
@@ -144,7 +147,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
// better reflect that, because GetPage@LSN requests might also point in the
// middle of a record, if the request LSN was taken from the server's current
// flush ptr.
page_cache::advance_last_valid_lsn(endlsn);
pcache.advance_last_valid_lsn(endlsn);
}
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {

View File

@@ -16,7 +16,7 @@
//
use tokio::runtime::Runtime;
use tokio::process::{Command, Child, ChildStdin, ChildStdout};
use std::process::Stdio;
use std::{path::PathBuf, process::Stdio};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncBufReadExt;
use tokio::time::timeout;
@@ -24,13 +24,14 @@ use std::io::Error;
use std::cell::RefCell;
use std::assert;
use std::sync::{Arc};
use std::fs;
use log::*;
use std::time::Instant;
use std::time::Duration;
use bytes::{Bytes, BytesMut, BufMut};
use crate::page_cache::BufferTag;
use crate::{PageServerConf, page_cache::BufferTag};
use crate::page_cache::CacheEntry;
use crate::page_cache::WALRecord;
use crate::page_cache;
@@ -40,25 +41,28 @@ static TIMEOUT: Duration = Duration::from_secs(20);
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_applicator_main()
pub fn wal_redo_main(conf: PageServerConf, sys_id: u64)
{
info!("WAL redo thread started");
info!("WAL redo thread started {}", sys_id);
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let pcache = page_cache::get_pagecahe(conf.clone(), sys_id);
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver;
let walredo_channel_receiver = &pcache.walredo_receiver;
loop {
let mut process: WalRedoProcess;
let datadir = conf.data_dir.join(format!("wal-redo/{}", sys_id));
info!("launching WAL redo postgres process");
info!("launching WAL redo postgres process {}", sys_id);
{
let _guard = runtime.enter();
process = WalRedoProcess::launch().unwrap();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
}
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
@@ -66,10 +70,9 @@ pub fn wal_applicator_main()
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100 {
let request = walredo_channel_receiver.recv().unwrap();
let result = handle_apply_request(&process, &runtime, request);
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// On error, kill the process.
break;
@@ -84,11 +87,11 @@ pub fn wal_applicator_main()
}
}
fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error>
fn handle_apply_request(pcache: &page_cache::PageCache, process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error>
{
let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn;
let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref());
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let mut entry = entry_rc.content.lock().unwrap();
entry.apply_pending = false;
@@ -110,7 +113,7 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
pcache.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
result = Ok(());
}
@@ -128,19 +131,39 @@ struct WalRedoProcess {
impl WalRedoProcess {
fn launch() -> Result<WalRedoProcess, Error> {
//
// Start postgres binary in special WAL redo mode.
//
//
// Start postgres binary in special WAL redo mode.
//
// Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
fn launch(datadir: &PathBuf, runtime: &Runtime) -> Result<WalRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir.to_str().unwrap()).ok();
let initdb = runtime.block_on(Command::new("initdb")
.args(&["-D", datadir.to_str().unwrap()])
.arg("-N")
.status()
).expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
// Start postgres itself
let mut child =
Command::new("postgres")
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env("PGDATA", datadir.to_str().unwrap())
.spawn()
.expect("postgres --wal-redo command failed to start");
info!("launched WAL redo postgres process on {}", datadir.to_str().unwrap());
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
@@ -159,7 +182,7 @@ impl WalRedoProcess {
if res.unwrap() == 0 {
break;
}
debug!("{}", line.trim());
debug!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())

View File

@@ -1,21 +1,6 @@
use pageserver::control_plane::ComputeControlPlane;
#[test]
fn test_actions() {
let mut cplane = ComputeControlPlane::local();
let node = cplane.new_vanilla_node();
node.start();
node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
let count: i64 = node
.safe_psql("postgres", "SELECT count(*) FROM t")
.first()
.unwrap()
.get(0);
assert_eq!(count, 100000);
}
#[test]

View File

@@ -11,19 +11,9 @@ use pageserver::control_plane::StorageControlPlane;
// Handcrafted cases with wal records that are (were) problematic for redo.
#[test]
fn test_redo_cases() {
// Allocate postgres instance, but don't start
let mut compute_cplane = ComputeControlPlane::local();
// Create compute node without files, only datadir structure
let node = compute_cplane.new_minimal_node();
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server(node.connstr());
let pageserver_addr = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pageserver_addr.ip(), pageserver_addr.port()).as_str());
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Request info needed to build control file
storage_cplane.simple_query_storage("postgres", node.whoami().as_str(), "controlfile");
@@ -31,7 +21,8 @@ fn test_redo_cases() {
node.setup_controlfile();
// start postgres
node.start();
let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
@@ -61,30 +52,57 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
fn test_regress() {
// Allocate postgres instance, but don't start
let mut compute_cplane = ComputeControlPlane::local();
let node = compute_cplane.new_vanilla_node();
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server(node.connstr());
let pageserver_addr = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf("postgresql.conf", format!("\
page_server_connstring = 'host={} port={}'\n\
", pageserver_addr.ip(), pageserver_addr.port()).as_str());
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// start postgres
node.start();
let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
//////////////////////////////////////////////////////////////////
pageserver::control_plane::regress_check(node);
pageserver::control_plane::regress_check(&node);
}
// Runs recovery with minio
// Run two postgres instances on one pageserver
#[test]
fn test_pageserver_recovery() {}
fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Allocate postgres instance, but don't start
let node1 = compute_cplane.new_node();
let node2 = compute_cplane.new_node();
node1.start(&storage_cplane);
node2.start(&storage_cplane);
// XXX: add some extension func to postgres to check walsender conn
// XXX: or better just drop that
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
// check node1
node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'");
let count: i64 = node1
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
// check node2
node2.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)");
node2.safe_psql("postgres", "INSERT INTO t SELECT generate_series(100000,200000), 'payload'");
let count: i64 = node2
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 15000150000);
}