mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 00:00:38 +00:00
Compare commits
3 Commits
conrad/pro
...
compute_no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a267dfa41f | ||
|
|
1b9eb9430c | ||
|
|
9a4fbf365c |
@@ -21,6 +21,8 @@ use std::{
|
|||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
|
|
||||||
|
use postgres;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
// postgres would be there if it was build by 'make postgres' here in the repo
|
// postgres would be there if it was build by 'make postgres' here in the repo
|
||||||
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
|
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||||
@@ -57,7 +59,7 @@ pub struct StorageControlPlane {
|
|||||||
|
|
||||||
impl StorageControlPlane {
|
impl StorageControlPlane {
|
||||||
// postgres <-> page_server
|
// postgres <-> page_server
|
||||||
pub fn one_page_server() -> StorageControlPlane {
|
pub fn one_page_server(froms3: bool) -> StorageControlPlane {
|
||||||
let mut cplane = StorageControlPlane {
|
let mut cplane = StorageControlPlane {
|
||||||
wal_acceptors: Vec::new(),
|
wal_acceptors: Vec::new(),
|
||||||
page_servers: Vec::new(),
|
page_servers: Vec::new(),
|
||||||
@@ -68,7 +70,11 @@ impl StorageControlPlane {
|
|||||||
data_dir: TEST_WORKDIR.join("pageserver"),
|
data_dir: TEST_WORKDIR.join("pageserver"),
|
||||||
};
|
};
|
||||||
pserver.init();
|
pserver.init();
|
||||||
pserver.start();
|
if froms3 {
|
||||||
|
pserver.start_froms3();
|
||||||
|
} else {
|
||||||
|
pserver.start();
|
||||||
|
}
|
||||||
|
|
||||||
cplane.page_servers.push(pserver);
|
cplane.page_servers.push(pserver);
|
||||||
cplane
|
cplane
|
||||||
@@ -186,6 +192,27 @@ impl PageServerNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn start_froms3(&self) {
|
||||||
|
println!("Starting pageserver at '{}'", self.page_service_addr);
|
||||||
|
|
||||||
|
let status = Command::new(BIN_DIR.join("pageserver"))
|
||||||
|
.args(&["-D", self.data_dir.to_str().unwrap()])
|
||||||
|
.args(&["-l", self.page_service_addr.to_string().as_str()])
|
||||||
|
.arg("-d")
|
||||||
|
.env_clear()
|
||||||
|
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
|
||||||
|
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||||
|
.env("S3_REGION", "us-east-1")
|
||||||
|
.env("S3_ACCESSKEY", "minioadmin")
|
||||||
|
.env("S3_SECRET", "minioadmin")
|
||||||
|
.status()
|
||||||
|
.expect("failed to start pageserver");
|
||||||
|
|
||||||
|
if !status.success() {
|
||||||
|
panic!("pageserver start failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn stop(&self) {
|
pub fn stop(&self) {
|
||||||
let pidfile = self.data_dir.join("pageserver.pid");
|
let pidfile = self.data_dir.join("pageserver.pid");
|
||||||
let pid = fs::read_to_string(pidfile).unwrap();
|
let pid = fs::read_to_string(pidfile).unwrap();
|
||||||
@@ -352,7 +379,7 @@ impl ComputeControlPlane<'_> {
|
|||||||
// Init compute node without files, only datadir structure
|
// Init compute node without files, only datadir structure
|
||||||
// use initdb --compute-node flag and GUC 'computenode_mode'
|
// use initdb --compute-node flag and GUC 'computenode_mode'
|
||||||
// to distinguish the node
|
// to distinguish the node
|
||||||
pub fn new_minimal_node(&mut self) -> &PostgresNode {
|
pub fn new_minimal_node<'a>(&mut self) -> &Arc<PostgresNode> {
|
||||||
// allocate new node entry with generated port
|
// allocate new node entry with generated port
|
||||||
let node_id = self.nodes.len() + 1;
|
let node_id = self.nodes.len() + 1;
|
||||||
let node = PostgresNode {
|
let node = PostgresNode {
|
||||||
@@ -365,7 +392,7 @@ impl ComputeControlPlane<'_> {
|
|||||||
self.nodes.push(Arc::new(node));
|
self.nodes.push(Arc::new(node));
|
||||||
let node = self.nodes.last().unwrap();
|
let node = self.nodes.last().unwrap();
|
||||||
|
|
||||||
// initialize data directory w/o files
|
// initialize data directory
|
||||||
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
|
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
|
||||||
let initdb_path = self.pg_bin_dir.join("initdb");
|
let initdb_path = self.pg_bin_dir.join("initdb");
|
||||||
println!("initdb_path: {}", initdb_path.to_str().unwrap());
|
println!("initdb_path: {}", initdb_path.to_str().unwrap());
|
||||||
@@ -383,6 +410,11 @@ impl ComputeControlPlane<'_> {
|
|||||||
panic!("initdb failed");
|
panic!("initdb failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// // allow local replication connections
|
||||||
|
// node.append_conf("pg_hba.conf", format!("\
|
||||||
|
// host replication all {}/32 sspi include_realm=1 map=regress\n\
|
||||||
|
// ", node.ip).as_str());
|
||||||
|
|
||||||
// listen for selected port
|
// listen for selected port
|
||||||
node.append_conf(
|
node.append_conf(
|
||||||
"postgresql.conf",
|
"postgresql.conf",
|
||||||
@@ -397,16 +429,37 @@ impl ComputeControlPlane<'_> {
|
|||||||
listen_addresses = '{address}'\n\
|
listen_addresses = '{address}'\n\
|
||||||
port = {port}\n\
|
port = {port}\n\
|
||||||
computenode_mode = true\n\
|
computenode_mode = true\n\
|
||||||
",
|
",
|
||||||
address = node.ip,
|
address = node.ip,
|
||||||
port = node.port
|
port = node.port
|
||||||
)
|
)
|
||||||
.as_str(),
|
.as_str(),
|
||||||
);
|
);
|
||||||
|
|
||||||
node
|
node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_node_wo_data(&mut self) -> Arc<PostgresNode> {
|
||||||
|
let storage_cplane = self.storage_cplane;
|
||||||
|
let node = self.new_minimal_node();
|
||||||
|
|
||||||
|
let pserver = storage_cplane.page_server_addr();
|
||||||
|
|
||||||
|
// Configure that node to take pages from pageserver
|
||||||
|
node.append_conf(
|
||||||
|
"postgresql.conf",
|
||||||
|
format!(
|
||||||
|
"\
|
||||||
|
page_server_connstring = 'host={} port={}'\n\
|
||||||
|
",
|
||||||
|
pserver.ip(),
|
||||||
|
pserver.port()
|
||||||
|
)
|
||||||
|
.as_str(),
|
||||||
|
);
|
||||||
|
|
||||||
|
node.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new_node(&mut self) -> Arc<PostgresNode> {
|
pub fn new_node(&mut self) -> Arc<PostgresNode> {
|
||||||
let storage_cplane = self.storage_cplane;
|
let storage_cplane = self.storage_cplane;
|
||||||
let node = self.new_vanilla_node();
|
let node = self.new_vanilla_node();
|
||||||
@@ -575,27 +628,133 @@ impl PostgresNode {
|
|||||||
self.pgdata.to_str()
|
self.pgdata.to_str()
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Create stub controlfile and respective xlog to start computenode */
|
// Request from pageserver stub controlfile, respective xlog
|
||||||
pub fn setup_controlfile(&self) {
|
// and a bunch of files needed to start computenode
|
||||||
let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap());
|
//
|
||||||
|
// NOTE this "file" request is a crutch.
|
||||||
|
// It asks pageserver to write requested page to the provided filepath
|
||||||
|
// and thus only works locally.
|
||||||
|
// TODO receive pages via some libpq protocol.
|
||||||
|
// The problem I've met is that nonrelfiles are not valid utf8 and cannot be
|
||||||
|
// handled by simple_query(). that expects test.
|
||||||
|
// And reqular query() uses prepared queries.
|
||||||
|
|
||||||
{
|
// TODO pass sysid as parameter
|
||||||
File::create(filepath).unwrap();
|
pub fn setup_compute_node(&self, sysid: u64, storage_cplane: &StorageControlPlane) {
|
||||||
|
let mut query;
|
||||||
|
//Request pg_control from pageserver
|
||||||
|
query = format!(
|
||||||
|
"file {}/global/pg_control,{},{},{},{},{},{},{}",
|
||||||
|
self.pgdata.to_str().unwrap(),
|
||||||
|
sysid as u64, //sysid
|
||||||
|
1664, //tablespace
|
||||||
|
0, //dboid
|
||||||
|
0, //reloid
|
||||||
|
42, //forknum pg_control
|
||||||
|
0, //blkno
|
||||||
|
0 //lsn
|
||||||
|
);
|
||||||
|
storage_cplane.page_server_psql(query.as_str());
|
||||||
|
|
||||||
|
//Request pg_xact and pg_multixact from pageserver
|
||||||
|
//We need them for initial pageserver startup and authentication
|
||||||
|
//TODO figure out which block number we really need
|
||||||
|
query = format!(
|
||||||
|
"file {}/pg_xact/0000,{},{},{},{},{},{},{}",
|
||||||
|
self.pgdata.to_str().unwrap(),
|
||||||
|
sysid as u64, //sysid
|
||||||
|
0, //tablespace
|
||||||
|
0, //dboid
|
||||||
|
0, //reloid
|
||||||
|
44, //forknum
|
||||||
|
0, //blkno
|
||||||
|
0 //lsn
|
||||||
|
);
|
||||||
|
storage_cplane.page_server_psql(query.as_str());
|
||||||
|
|
||||||
|
query = format!(
|
||||||
|
"file {}/pg_multixact/offsets/0000,{},{},{},{},{},{},{}",
|
||||||
|
self.pgdata.to_str().unwrap(),
|
||||||
|
sysid as u64, //sysid
|
||||||
|
0, //tablespace
|
||||||
|
0, //dboid
|
||||||
|
0, //reloid
|
||||||
|
45, //forknum
|
||||||
|
0, //blkno
|
||||||
|
0 //lsn
|
||||||
|
);
|
||||||
|
storage_cplane.page_server_psql(query.as_str());
|
||||||
|
|
||||||
|
query = format!(
|
||||||
|
"file {}/pg_multixact/members/0000,{},{},{},{},{},{},{}",
|
||||||
|
self.pgdata.to_str().unwrap(),
|
||||||
|
sysid as u64, //sysid
|
||||||
|
0, //tablespace
|
||||||
|
0, //dboid
|
||||||
|
0, //reloid
|
||||||
|
46, //forknum
|
||||||
|
0, //blkno
|
||||||
|
0 //lsn
|
||||||
|
);
|
||||||
|
storage_cplane.page_server_psql(query.as_str());
|
||||||
|
|
||||||
|
//Request a few shared catalogs needed for authentication
|
||||||
|
//Without them we cannot setup connection with pageserver to request further pages
|
||||||
|
let reloids = [1260, 1261, 1262, 2396];
|
||||||
|
for reloid in reloids.iter() {
|
||||||
|
//FIXME request all blocks from file, not just 10
|
||||||
|
for blkno in 0..10 {
|
||||||
|
query = format!(
|
||||||
|
"file {}/global/{},{},{},{},{},{},{},{}",
|
||||||
|
self.pgdata.to_str().unwrap(),
|
||||||
|
reloid, //suse it as filename
|
||||||
|
sysid as u64, //sysid
|
||||||
|
1664, //tablespace
|
||||||
|
0, //dboid
|
||||||
|
reloid, //reloid
|
||||||
|
0, //forknum
|
||||||
|
blkno, //blkno
|
||||||
|
0 //lsn
|
||||||
|
);
|
||||||
|
storage_cplane.page_server_psql(query.as_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs::create_dir(format!("{}/base/13006", self.pgdata.to_str().unwrap())).unwrap();
|
||||||
|
fs::create_dir(format!("{}/base/13007", self.pgdata.to_str().unwrap())).unwrap();
|
||||||
|
|
||||||
|
//FIXME figure out what wal file we need to successfully start
|
||||||
|
let walfilepath = format!(
|
||||||
|
"{}/pg_wal/000000010000000000000001",
|
||||||
|
self.pgdata.to_str().unwrap()
|
||||||
|
);
|
||||||
|
fs::copy(
|
||||||
|
"/home/anastasia/zenith/zenith/tmp_check/pgdata/pg_wal/000000010000000000000001",
|
||||||
|
walfilepath,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
println!("before resetwal ");
|
||||||
|
|
||||||
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
|
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
|
||||||
|
|
||||||
|
// Now it does nothing, just prints existing content of pg_control.
|
||||||
|
// TODO update values with most recent lsn, xid, oid requested from pageserver
|
||||||
let pg_resetwal = Command::new(pg_resetwal_path)
|
let pg_resetwal = Command::new(pg_resetwal_path)
|
||||||
.args(&["-D", self.pgdata.to_str().unwrap()])
|
.args(&["-D", self.pgdata.to_str().unwrap()])
|
||||||
.arg("-f")
|
.arg("-n") //dry run
|
||||||
// TODO probably we will have to modify pg_resetwal
|
//.arg("-f")
|
||||||
// .arg("--compute-node")
|
//.args(&["--next-transaction-id", "100500"])
|
||||||
|
//.args(&["--next-oid", "17000"])
|
||||||
|
//.args(&["--next-transaction-id", "100500"])
|
||||||
.status()
|
.status()
|
||||||
.expect("failed to execute pg_resetwal");
|
.expect("failed to execute pg_resetwal");
|
||||||
|
|
||||||
if !pg_resetwal.success() {
|
if !pg_resetwal.success() {
|
||||||
panic!("pg_resetwal failed");
|
panic!("pg_resetwal failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
println!("setup done");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
|
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
|
||||||
@@ -613,6 +772,28 @@ impl PostgresNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn push_to_s3(&self) {
|
||||||
|
println!("Push to s3 node at '{}'", self.pgdata.to_str().unwrap());
|
||||||
|
|
||||||
|
let zenith_push_path = self.pg_bin_dir.join("zenith_push");
|
||||||
|
println!("zenith_push_path: {}", zenith_push_path.to_str().unwrap());
|
||||||
|
|
||||||
|
let status = Command::new(zenith_push_path)
|
||||||
|
.args(&["-D", self.pgdata.to_str().unwrap()])
|
||||||
|
.env_clear()
|
||||||
|
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||||
|
.env("S3_REGION", "us-east-1")
|
||||||
|
.env("S3_ACCESSKEY", "minioadmin")
|
||||||
|
.env("S3_SECRET", "minioadmin")
|
||||||
|
// .env("S3_BUCKET", "zenith-testbucket")
|
||||||
|
.status()
|
||||||
|
.expect("failed to push node to s3");
|
||||||
|
|
||||||
|
if !status.success() {
|
||||||
|
panic!("zenith_push failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
pub fn pg_bench() {}
|
pub fn pg_bench() {}
|
||||||
pub fn pg_regress() {}
|
pub fn pg_regress() {}
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
mod control_plane;
|
mod control_plane;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use control_plane::ComputeControlPlane;
|
use control_plane::ComputeControlPlane;
|
||||||
use control_plane::StorageControlPlane;
|
use control_plane::StorageControlPlane;
|
||||||
@@ -8,17 +10,21 @@ use control_plane::StorageControlPlane;
|
|||||||
// -- restart + seqscan won't read deleted stuff
|
// -- restart + seqscan won't read deleted stuff
|
||||||
// -- pageserver api endpoint to check all rels
|
// -- pageserver api endpoint to check all rels
|
||||||
|
|
||||||
// Handcrafted cases with wal records that are (were) problematic for redo.
|
//Handcrafted cases with wal records that are (were) problematic for redo.
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
fn test_redo_cases() {
|
fn test_redo_cases() {
|
||||||
// Start pageserver that reads WAL directly from that postgres
|
// Start pageserver that reads WAL directly from that postgres
|
||||||
let storage_cplane = StorageControlPlane::one_page_server();
|
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||||
|
|
||||||
// start postgres
|
// start postgres
|
||||||
let node = compute_cplane.new_node();
|
let node = compute_cplane.new_node();
|
||||||
node.start(&storage_cplane);
|
node.start(&storage_cplane);
|
||||||
|
|
||||||
|
println!("await pageserver connection...");
|
||||||
|
sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
// check basic work with table
|
// check basic work with table
|
||||||
node.safe_psql(
|
node.safe_psql(
|
||||||
"postgres",
|
"postgres",
|
||||||
@@ -26,7 +32,7 @@ fn test_redo_cases() {
|
|||||||
);
|
);
|
||||||
node.safe_psql(
|
node.safe_psql(
|
||||||
"postgres",
|
"postgres",
|
||||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||||
);
|
);
|
||||||
let count: i64 = node
|
let count: i64 = node
|
||||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||||
@@ -34,9 +40,9 @@ fn test_redo_cases() {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.get(0);
|
.get(0);
|
||||||
println!("sum = {}", count);
|
println!("sum = {}", count);
|
||||||
assert_eq!(count, 5000050000);
|
assert_eq!(count, 5050);
|
||||||
|
|
||||||
// check 'create table as'
|
//check 'create table as'
|
||||||
node.safe_psql("postgres", "CREATE TABLE t2 AS SELECT * FROM t");
|
node.safe_psql("postgres", "CREATE TABLE t2 AS SELECT * FROM t");
|
||||||
let count: i64 = node
|
let count: i64 = node
|
||||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||||
@@ -44,28 +50,33 @@ fn test_redo_cases() {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.get(0);
|
.get(0);
|
||||||
println!("sum = {}", count);
|
println!("sum = {}", count);
|
||||||
assert_eq!(count, 5000050000);
|
assert_eq!(count, 5050);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs pg_regress on a compute node
|
// Runs pg_regress on a compute node
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
fn test_regress() {
|
fn test_regress() {
|
||||||
// Start pageserver that reads WAL directly from that postgres
|
// Start pageserver that reads WAL directly from that postgres
|
||||||
let storage_cplane = StorageControlPlane::one_page_server();
|
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||||
|
|
||||||
// start postgres
|
// start postgres
|
||||||
let node = compute_cplane.new_node();
|
let node = compute_cplane.new_node();
|
||||||
node.start(&storage_cplane);
|
node.start(&storage_cplane);
|
||||||
|
|
||||||
|
println!("await pageserver connection...");
|
||||||
|
sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
control_plane::regress_check(&node);
|
control_plane::regress_check(&node);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run two postgres instances on one pageserver
|
// Run two postgres instances on one pageserver
|
||||||
#[test]
|
#[test]
|
||||||
|
#[ignore]
|
||||||
fn test_pageserver_multitenancy() {
|
fn test_pageserver_multitenancy() {
|
||||||
// Start pageserver that reads WAL directly from that postgres
|
// Start pageserver that reads WAL directly from that postgres
|
||||||
let storage_cplane = StorageControlPlane::one_page_server();
|
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||||
|
|
||||||
// Allocate postgres instance, but don't start
|
// Allocate postgres instance, but don't start
|
||||||
@@ -74,6 +85,11 @@ fn test_pageserver_multitenancy() {
|
|||||||
node1.start(&storage_cplane);
|
node1.start(&storage_cplane);
|
||||||
node2.start(&storage_cplane);
|
node2.start(&storage_cplane);
|
||||||
|
|
||||||
|
// XXX: add some extension func to postgres to check walsender conn
|
||||||
|
// XXX: or better just drop that
|
||||||
|
println!("await pageserver connection...");
|
||||||
|
sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
// check node1
|
// check node1
|
||||||
node1.safe_psql(
|
node1.safe_psql(
|
||||||
"postgres",
|
"postgres",
|
||||||
@@ -81,7 +97,7 @@ fn test_pageserver_multitenancy() {
|
|||||||
);
|
);
|
||||||
node1.safe_psql(
|
node1.safe_psql(
|
||||||
"postgres",
|
"postgres",
|
||||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||||
);
|
);
|
||||||
let count: i64 = node1
|
let count: i64 = node1
|
||||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||||
@@ -89,7 +105,7 @@ fn test_pageserver_multitenancy() {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.get(0);
|
.get(0);
|
||||||
println!("sum = {}", count);
|
println!("sum = {}", count);
|
||||||
assert_eq!(count, 5000050000);
|
assert_eq!(count, 5050);
|
||||||
|
|
||||||
// check node2
|
// check node2
|
||||||
node2.safe_psql(
|
node2.safe_psql(
|
||||||
@@ -98,7 +114,7 @@ fn test_pageserver_multitenancy() {
|
|||||||
);
|
);
|
||||||
node2.safe_psql(
|
node2.safe_psql(
|
||||||
"postgres",
|
"postgres",
|
||||||
"INSERT INTO t SELECT generate_series(100000,200000), 'payload'",
|
"INSERT INTO t SELECT generate_series(100,200), 'payload'",
|
||||||
);
|
);
|
||||||
let count: i64 = node2
|
let count: i64 = node2
|
||||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||||
@@ -106,5 +122,89 @@ fn test_pageserver_multitenancy() {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.get(0);
|
.get(0);
|
||||||
println!("sum = {}", count);
|
println!("sum = {}", count);
|
||||||
assert_eq!(count, 15000150000);
|
assert_eq!(count, 15150);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
// Start pageserver using s3 base image
|
||||||
|
//
|
||||||
|
// Requires working minio with hardcoded setup:
|
||||||
|
// .env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||||
|
// .env("S3_REGION", "us-east-1")
|
||||||
|
// .env("S3_ACCESSKEY", "minioadmin")
|
||||||
|
// .env("S3_SECRET", "minioadmin")
|
||||||
|
// .env("S3_BUCKET", "zenith-testbucket")
|
||||||
|
// TODO use env variables in test
|
||||||
|
fn test_pageserver_recovery() {
|
||||||
|
//This test expects that image is already uploaded to s3
|
||||||
|
//To upload it use zenith_push before test (see node.push_to_s3() for details)
|
||||||
|
let storage_cplane = StorageControlPlane::one_page_server(true);
|
||||||
|
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||||
|
|
||||||
|
//Wait while daemon uploads pages from s3
|
||||||
|
sleep(Duration::from_secs(15));
|
||||||
|
|
||||||
|
let node_restored = compute_cplane.new_node_wo_data();
|
||||||
|
|
||||||
|
//TODO 6947041219207877724 is a hardcoded sysid for my cluster. Get it somewhere
|
||||||
|
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
|
||||||
|
|
||||||
|
node_restored.start(&storage_cplane);
|
||||||
|
|
||||||
|
let rows = node_restored.safe_psql("postgres", "SELECT relname from pg_class;");
|
||||||
|
|
||||||
|
assert_eq!(rows.len(), 395);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
//Scenario for future test. Not implemented yet
|
||||||
|
fn test_pageserver_node_switch() {
|
||||||
|
//Create pageserver
|
||||||
|
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||||
|
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||||
|
|
||||||
|
//Create reqular node
|
||||||
|
let node = compute_cplane.new_node();
|
||||||
|
node.start(&storage_cplane);
|
||||||
|
|
||||||
|
node.safe_psql(
|
||||||
|
"postgres",
|
||||||
|
"CREATE TABLE t(key int primary key, value text)",
|
||||||
|
);
|
||||||
|
node.safe_psql(
|
||||||
|
"postgres",
|
||||||
|
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||||
|
);
|
||||||
|
let count: i64 = node
|
||||||
|
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||||
|
.first()
|
||||||
|
.unwrap()
|
||||||
|
.get(0);
|
||||||
|
println!("sum = {}", count);
|
||||||
|
assert_eq!(count, 5050);
|
||||||
|
|
||||||
|
//Push all node files to s3
|
||||||
|
//TODO upload them directly to pageserver
|
||||||
|
node.push_to_s3();
|
||||||
|
//Upload data from s3 to pageserver
|
||||||
|
//storage_cplane.upload_from_s3() //Not implemented yet
|
||||||
|
|
||||||
|
//Shut down the node
|
||||||
|
node.stop();
|
||||||
|
|
||||||
|
//Create new node without files
|
||||||
|
let node_restored = compute_cplane.new_node_wo_data();
|
||||||
|
|
||||||
|
// Setup minimal set of files needed to start node and setup pageserver connection
|
||||||
|
// TODO 6947041219207877724 is a hardcoded sysid. Get it from node
|
||||||
|
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
|
||||||
|
|
||||||
|
//Start compute node without files
|
||||||
|
node_restored.start(&storage_cplane);
|
||||||
|
|
||||||
|
//Ensure that is has table created on initial node
|
||||||
|
let rows = node_restored.safe_psql("postgres", "SELECT key from t;");
|
||||||
|
assert_eq!(rows.len(), 5050);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -159,6 +159,7 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
|
|||||||
// Before opening up for connections, restore the latest base backup from S3.
|
// Before opening up for connections, restore the latest base backup from S3.
|
||||||
// (We don't persist anything to local disk at the moment, so we need to do
|
// (We don't persist anything to local disk at the moment, so we need to do
|
||||||
// this at every startup)
|
// this at every startup)
|
||||||
|
// TODO move it to a separate function
|
||||||
if !conf.skip_recovery {
|
if !conf.skip_recovery {
|
||||||
restore_s3::restore_main(&conf);
|
restore_s3::restore_main(&conf);
|
||||||
}
|
}
|
||||||
|
|||||||
218
pageserver/src/controlfile.rs
Normal file
218
pageserver/src/controlfile.rs
Normal file
@@ -0,0 +1,218 @@
|
|||||||
|
#![allow(non_camel_case_types)]
|
||||||
|
#![allow(non_snake_case)]
|
||||||
|
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::prelude::*;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
|
||||||
|
use bytes::{Buf, Bytes};
|
||||||
|
|
||||||
|
use log::*;
|
||||||
|
|
||||||
|
type XLogRecPtr = u64;
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/*
|
||||||
|
* Body of CheckPoint XLOG records. This is declared here because we keep
|
||||||
|
* a copy of the latest one in pg_control for possible disaster recovery.
|
||||||
|
* Changing this struct requires a PG_CONTROL_VERSION bump.
|
||||||
|
*/
|
||||||
|
pub struct CheckPoint {
|
||||||
|
pub redo: XLogRecPtr, /* next RecPtr available when we began to
|
||||||
|
* create CheckPoint (i.e. REDO start point) */
|
||||||
|
pub ThisTimeLineID: u32, /* current TLI */
|
||||||
|
pub PrevTimeLineID: u32, /* previous TLI, if this record begins a new
|
||||||
|
* timeline (equals ThisTimeLineID otherwise) */
|
||||||
|
pub fullPageWrites: bool, /* current full_page_writes */
|
||||||
|
pub nextXid: u64, /* next free transaction ID */
|
||||||
|
pub nextOid: u32, /* next free OID */
|
||||||
|
pub nextMulti: u32, /* next free MultiXactId */
|
||||||
|
pub nextMultiOffset: u32, /* next free MultiXact offset */
|
||||||
|
pub oldestXid: u32, /* cluster-wide minimum datfrozenxid */
|
||||||
|
pub oldestXidDB: u32, /* database with minimum datfrozenxid */
|
||||||
|
pub oldestMulti: u32, /* cluster-wide minimum datminmxid */
|
||||||
|
pub oldestMultiDB: u32, /* database with minimum datminmxid */
|
||||||
|
pub time: u64, /* time stamp of checkpoint */
|
||||||
|
pub oldestCommitTsXid: u32, /* oldest Xid with valid commit
|
||||||
|
* timestamp */
|
||||||
|
pub newestCommitTsXid: u32, /* newest Xid with valid commit
|
||||||
|
* timestamp */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Oldest XID still running. This is only needed to initialize hot standby
|
||||||
|
* mode from an online checkpoint, so we only bother calculating this for
|
||||||
|
* online checkpoints and only when wal_level is replica. Otherwise it's
|
||||||
|
* set to InvalidTransactionId.
|
||||||
|
*/
|
||||||
|
pub oldestActiveXid: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ControlFileDataZenith {
|
||||||
|
pub system_identifier: u64,
|
||||||
|
pg_control_version: u32, /* PG_CONTROL_VERSION */
|
||||||
|
catalog_version_no: u32, /* see catversion.h */
|
||||||
|
|
||||||
|
state: i32, /* see enum above */
|
||||||
|
time: i64, /* time stamp of last pg_control update */
|
||||||
|
pub checkPoint: XLogRecPtr,
|
||||||
|
checkPointCopy: CheckPoint, /* copy of last check point record */
|
||||||
|
unloggedLSN: XLogRecPtr, /* current fake LSN value, for unlogged rels */
|
||||||
|
minRecoveryPoint: XLogRecPtr,
|
||||||
|
minRecoveryPointTLI: u32,
|
||||||
|
backupStartPoint: XLogRecPtr,
|
||||||
|
backupEndPoint: XLogRecPtr,
|
||||||
|
backupEndRequired: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ControlFileDataZenith {
|
||||||
|
pub fn new() -> ControlFileDataZenith {
|
||||||
|
ControlFileDataZenith {
|
||||||
|
system_identifier: 0,
|
||||||
|
pg_control_version: 0,
|
||||||
|
catalog_version_no: 0,
|
||||||
|
state: 0,
|
||||||
|
time: 0,
|
||||||
|
checkPoint: 0,
|
||||||
|
checkPointCopy: {
|
||||||
|
CheckPoint {
|
||||||
|
redo: 0,
|
||||||
|
ThisTimeLineID: 0,
|
||||||
|
PrevTimeLineID: 0,
|
||||||
|
fullPageWrites: false,
|
||||||
|
nextXid: 0,
|
||||||
|
nextOid: 0,
|
||||||
|
nextMulti: 0,
|
||||||
|
nextMultiOffset: 0,
|
||||||
|
oldestXid: 0,
|
||||||
|
oldestXidDB: 0,
|
||||||
|
oldestMulti: 0,
|
||||||
|
oldestMultiDB: 0,
|
||||||
|
time: 0,
|
||||||
|
oldestCommitTsXid: 0,
|
||||||
|
newestCommitTsXid: 0,
|
||||||
|
oldestActiveXid: 0,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
unloggedLSN: 0,
|
||||||
|
minRecoveryPoint: 0,
|
||||||
|
minRecoveryPointTLI: 0,
|
||||||
|
backupStartPoint: 0,
|
||||||
|
backupEndPoint: 0,
|
||||||
|
backupEndRequired: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_pg_control(mut buf: Bytes) -> ControlFileDataZenith {
|
||||||
|
info!("decode pg_control");
|
||||||
|
|
||||||
|
let controlfile: ControlFileDataZenith = ControlFileDataZenith {
|
||||||
|
system_identifier: buf.get_u64_le(),
|
||||||
|
pg_control_version: buf.get_u32_le(),
|
||||||
|
catalog_version_no: buf.get_u32_le(),
|
||||||
|
state: buf.get_i32_le(),
|
||||||
|
time: {
|
||||||
|
buf.advance(4);
|
||||||
|
buf.get_i64_le()
|
||||||
|
},
|
||||||
|
checkPoint: buf.get_u64_le(),
|
||||||
|
checkPointCopy: {
|
||||||
|
CheckPoint {
|
||||||
|
redo: buf.get_u64_le(),
|
||||||
|
ThisTimeLineID: buf.get_u32_le(),
|
||||||
|
PrevTimeLineID: buf.get_u32_le(),
|
||||||
|
fullPageWrites: buf.get_u8() != 0,
|
||||||
|
nextXid: {
|
||||||
|
buf.advance(7);
|
||||||
|
buf.get_u64_le()
|
||||||
|
},
|
||||||
|
nextOid: buf.get_u32_le(),
|
||||||
|
nextMulti: buf.get_u32_le(),
|
||||||
|
nextMultiOffset: buf.get_u32_le(),
|
||||||
|
oldestXid: buf.get_u32_le(),
|
||||||
|
oldestXidDB: buf.get_u32_le(),
|
||||||
|
oldestMulti: buf.get_u32_le(),
|
||||||
|
oldestMultiDB: buf.get_u32_le(),
|
||||||
|
time: {
|
||||||
|
buf.advance(4);
|
||||||
|
buf.get_u64_le()
|
||||||
|
},
|
||||||
|
oldestCommitTsXid: buf.get_u32_le(),
|
||||||
|
newestCommitTsXid: buf.get_u32_le(),
|
||||||
|
oldestActiveXid: buf.get_u32_le(),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
unloggedLSN: buf.get_u64_le(),
|
||||||
|
minRecoveryPoint: buf.get_u64_le(),
|
||||||
|
minRecoveryPointTLI: buf.get_u32_le(),
|
||||||
|
backupStartPoint: {
|
||||||
|
buf.advance(4);
|
||||||
|
buf.get_u64_le()
|
||||||
|
},
|
||||||
|
backupEndPoint: buf.get_u64_le(),
|
||||||
|
backupEndRequired: buf.get_u8() != 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
return controlfile;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_controlfile(b: Bytes) {
|
||||||
|
let controlfile = decode_pg_control(b);
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"controlfile {:X}/{:X}",
|
||||||
|
controlfile.checkPoint >> 32,
|
||||||
|
controlfile.checkPoint
|
||||||
|
);
|
||||||
|
info!("controlfile {:?}", controlfile);
|
||||||
|
}
|
||||||
|
|
||||||
|
const MAX_MAPPINGS: usize = 62;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RelMapping {
|
||||||
|
mapoid: u32, /* OID of a catalog */
|
||||||
|
mapfilenode: u32, /* its filenode number */
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RelMapFile {
|
||||||
|
magic: i32, /* always RELMAPPER_FILEMAGIC */
|
||||||
|
num_mappings: i32, /* number of valid RelMapping entries */
|
||||||
|
mappings: [u8; MAX_MAPPINGS * 8],
|
||||||
|
crc: u32, /* CRC of all above */
|
||||||
|
pad: i32, /* to make the struct size be 512 exactly */
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_filemapping(mut buf: Bytes) -> RelMapFile {
|
||||||
|
info!("decode filemap");
|
||||||
|
|
||||||
|
let file: RelMapFile = RelMapFile {
|
||||||
|
magic: buf.get_i32_le(), /* always RELMAPPER_FILEMAGIC */
|
||||||
|
num_mappings: buf.get_i32_le(), /* number of valid RelMapping entries */
|
||||||
|
mappings: {
|
||||||
|
let mut arr = [0 as u8; MAX_MAPPINGS * 8];
|
||||||
|
buf.copy_to_slice(&mut arr);
|
||||||
|
arr
|
||||||
|
},
|
||||||
|
crc: buf.get_u32_le(), /* CRC of all above */
|
||||||
|
pad: buf.get_i32_le(),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("decode filemap {:?}", file);
|
||||||
|
file
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_buf_to_file(filepath: String, buf: Bytes, blkno: u32) {
|
||||||
|
info!("write_buf_to_file {}", filepath.clone());
|
||||||
|
|
||||||
|
let mut buffer = File::create(filepath.clone()).unwrap();
|
||||||
|
buffer.seek(SeekFrom::Start(8192 * blkno as u64)).unwrap();
|
||||||
|
|
||||||
|
buffer.write_all(&buf).unwrap();
|
||||||
|
|
||||||
|
info!("DONE write_buf_to_file {}", filepath);
|
||||||
|
}
|
||||||
@@ -1,8 +1,11 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
pub mod controlfile;
|
||||||
pub mod page_cache;
|
pub mod page_cache;
|
||||||
pub mod page_service;
|
pub mod page_service;
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub mod pg_constants;
|
||||||
pub mod restore_s3;
|
pub mod restore_s3;
|
||||||
pub mod tui;
|
pub mod tui;
|
||||||
pub mod tui_event;
|
pub mod tui_event;
|
||||||
|
|||||||
@@ -8,20 +8,21 @@
|
|||||||
|
|
||||||
use core::ops::Bound::Included;
|
use core::ops::Bound::Included;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
use std::{convert::TryInto, ops::AddAssign};
|
||||||
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::sync::atomic::AtomicU64;
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::{Arc, Condvar, Mutex};
|
use std::sync::{Arc, Condvar, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::{convert::TryInto, ops::AddAssign};
|
|
||||||
// use tokio::sync::RwLock;
|
// use tokio::sync::RwLock;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::*;
|
use log::*;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
use crate::{walredo, PageServerConf};
|
use crate::{controlfile, walredo, PageServerConf};
|
||||||
|
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
use crossbeam_channel::{Receiver, Sender};
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
@@ -107,6 +108,8 @@ struct PageCacheShared {
|
|||||||
first_valid_lsn: u64,
|
first_valid_lsn: u64,
|
||||||
last_valid_lsn: u64,
|
last_valid_lsn: u64,
|
||||||
last_record_lsn: u64,
|
last_record_lsn: u64,
|
||||||
|
|
||||||
|
controldata: controlfile::ControlFileDataZenith,
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
@@ -146,6 +149,7 @@ fn init_page_cache() -> PageCache {
|
|||||||
first_valid_lsn: 0,
|
first_valid_lsn: 0,
|
||||||
last_valid_lsn: 0,
|
last_valid_lsn: 0,
|
||||||
last_record_lsn: 0,
|
last_record_lsn: 0,
|
||||||
|
controldata: controlfile::ControlFileDataZenith::new(),
|
||||||
}),
|
}),
|
||||||
valid_lsn_condvar: Condvar::new(),
|
valid_lsn_condvar: Condvar::new(),
|
||||||
|
|
||||||
@@ -176,7 +180,7 @@ fn init_page_cache() -> PageCache {
|
|||||||
// stored directly in the cache entry in that you still need to run the WAL redo
|
// stored directly in the cache entry in that you still need to run the WAL redo
|
||||||
// routine to generate the page image.
|
// routine to generate the page image.
|
||||||
//
|
//
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
|
||||||
pub struct CacheKey {
|
pub struct CacheKey {
|
||||||
pub tag: BufferTag,
|
pub tag: BufferTag,
|
||||||
pub lsn: u64,
|
pub lsn: u64,
|
||||||
@@ -215,7 +219,7 @@ impl CacheEntry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
|
#[derive(Eq, PartialEq, Hash, Clone, Copy, Debug)]
|
||||||
pub struct RelTag {
|
pub struct RelTag {
|
||||||
pub spcnode: u32,
|
pub spcnode: u32,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
@@ -223,7 +227,7 @@ pub struct RelTag {
|
|||||||
pub forknum: u8,
|
pub forknum: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||||
pub struct BufferTag {
|
pub struct BufferTag {
|
||||||
pub spcnode: u32,
|
pub spcnode: u32,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
@@ -242,25 +246,145 @@ pub struct WALRecord {
|
|||||||
// Public interface functions
|
// Public interface functions
|
||||||
|
|
||||||
impl PageCache {
|
impl PageCache {
|
||||||
|
pub fn get_nonrel_page(&self, tag: BufferTag, _reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
||||||
|
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
// Now we don't have versioning for non-rel pages.
|
||||||
|
// Also at bootstrap we don't know lsn for some files.
|
||||||
|
// So always request the very latest version
|
||||||
|
// let lsn = reqlsn;
|
||||||
|
|
||||||
|
let lsn = u64::MAX;
|
||||||
|
|
||||||
|
let minkey = CacheKey { tag: tag, lsn: 0 };
|
||||||
|
// Look up to the largest lsn
|
||||||
|
let maxkey = CacheKey { tag: tag, lsn: lsn };
|
||||||
|
|
||||||
|
let entry_rc: Arc<CacheEntry>;
|
||||||
|
{
|
||||||
|
let shared = self.shared.lock().unwrap();
|
||||||
|
|
||||||
|
let pagecache = &shared.pagecache;
|
||||||
|
info!("got pagecache {}", pagecache.len());
|
||||||
|
|
||||||
|
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
|
||||||
|
|
||||||
|
let entry_opt = entries.next_back();
|
||||||
|
|
||||||
|
if entry_opt.is_none() {
|
||||||
|
return Err(format!(
|
||||||
|
"not found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
|
||||||
|
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
|
||||||
|
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||||
|
);
|
||||||
|
|
||||||
|
let (_key, entry) = entry_opt.unwrap();
|
||||||
|
entry_rc = entry.clone();
|
||||||
|
|
||||||
|
// Now that we have a reference to the cache entry, drop the lock on the map.
|
||||||
|
// It's important to do this before waiting on the condition variable below,
|
||||||
|
// and better to do it as soon as possible to maximize concurrency.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock the cache entry and dig the page image out of it.
|
||||||
|
let page_img: Bytes;
|
||||||
|
{
|
||||||
|
let entry_content = entry_rc.content.lock().unwrap();
|
||||||
|
|
||||||
|
if let Some(img) = &entry_content.page_image {
|
||||||
|
assert!(!entry_content.apply_pending);
|
||||||
|
page_img = img.clone();
|
||||||
|
} else if entry_content.wal_record.is_some() {
|
||||||
|
return Err("non-rel WAL redo is not implemented yet".into());
|
||||||
|
//
|
||||||
|
// If this page needs to be reconstructed by applying some WAL,
|
||||||
|
// send a request to the WAL redo thread.
|
||||||
|
//
|
||||||
|
// if !entry_content.apply_pending {
|
||||||
|
// assert!(!entry_content.apply_pending);
|
||||||
|
// entry_content.apply_pending = true;
|
||||||
|
|
||||||
|
// let s = &self.walredo_sender;
|
||||||
|
// s.send(entry_rc.clone())?;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// while entry_content.apply_pending {
|
||||||
|
// entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
|
||||||
|
//}
|
||||||
|
|
||||||
|
// We should now have a page image. If we don't, it means that WAL redo
|
||||||
|
// failed to reconstruct it. WAL redo should've logged that error already.
|
||||||
|
// page_img = match &entry_content.page_image {
|
||||||
|
// Some(p) => p.clone(),
|
||||||
|
// None => {
|
||||||
|
// error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
|
||||||
|
// return Err("could not apply WAL to reconstruct page image".into());
|
||||||
|
// }
|
||||||
|
// };
|
||||||
|
} else {
|
||||||
|
// No base image, and no WAL record. Huh?
|
||||||
|
return Err(format!("no page image or WAL record for requested page"))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
"Returning page for {}/{}/{}.{} blk {}",
|
||||||
|
tag.spcnode,
|
||||||
|
tag.dbnode,
|
||||||
|
tag.relnode,
|
||||||
|
tag.forknum,
|
||||||
|
tag.blknum
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(page_img);
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// GetPage@LSN
|
// GetPage@LSN
|
||||||
//
|
//
|
||||||
// Returns an 8k page image
|
// Returns an 8k page image
|
||||||
//
|
//
|
||||||
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
pub fn get_page_at_lsn(&self, tag: BufferTag, reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
||||||
|
let mut lsn = reqlsn;
|
||||||
|
|
||||||
|
if tag.forknum > 40 {
|
||||||
|
info!(
|
||||||
|
"get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}",
|
||||||
|
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||||
|
);
|
||||||
|
|
||||||
|
return self.get_nonrel_page(tag, lsn);
|
||||||
|
}
|
||||||
|
|
||||||
|
if reqlsn == 0 {
|
||||||
|
let c = self.get_controldata();
|
||||||
|
lsn = c.checkPoint;
|
||||||
|
|
||||||
|
info!("update reqlsn get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn,
|
||||||
|
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum);
|
||||||
|
}
|
||||||
|
|
||||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||||
let minkey = CacheKey { tag: tag, lsn: 0 };
|
let minkey = CacheKey { tag: tag, lsn: 0 };
|
||||||
let maxkey = CacheKey { tag: tag, lsn: lsn };
|
let maxkey = CacheKey { tag: tag, lsn: lsn };
|
||||||
|
|
||||||
let entry_rc: Arc<CacheEntry>;
|
let entry_rc: Arc<CacheEntry>;
|
||||||
{
|
{
|
||||||
let mut shared = self.shared.lock().unwrap();
|
let mut shared = self.shared.lock().unwrap();
|
||||||
|
|
||||||
let mut waited = false;
|
let mut waited = false;
|
||||||
|
|
||||||
while lsn > shared.last_valid_lsn {
|
// When server just started and created checkpoint lsn,
|
||||||
|
// but we have not yet established connection,
|
||||||
|
// requested lsn will be larger than the one we have
|
||||||
|
while lsn > shared.last_valid_lsn + 500 {
|
||||||
// TODO: Wait for the WAL receiver to catch up
|
// TODO: Wait for the WAL receiver to catch up
|
||||||
waited = true;
|
waited = true;
|
||||||
trace!(
|
trace!(
|
||||||
@@ -276,8 +400,8 @@ impl PageCache {
|
|||||||
shared = wait_result.0;
|
shared = wait_result.0;
|
||||||
if wait_result.1.timed_out() {
|
if wait_result.1.timed_out() {
|
||||||
return Err(format!(
|
return Err(format!(
|
||||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||||
lsn >> 32, lsn & 0xffff_ffff
|
lsn
|
||||||
))?;
|
))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -286,8 +410,7 @@ impl PageCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if lsn < shared.first_valid_lsn {
|
if lsn < shared.first_valid_lsn {
|
||||||
return Err(format!("LSN {:X}/{:X} has already been removed",
|
return Err(format!("LSN {} has already been removed", lsn))?;
|
||||||
lsn >> 32, lsn & 0xffff_ffff))?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let pagecache = &shared.pagecache;
|
let pagecache = &shared.pagecache;
|
||||||
@@ -297,9 +420,9 @@ impl PageCache {
|
|||||||
let entry_opt = entries.next_back();
|
let entry_opt = entries.next_back();
|
||||||
|
|
||||||
if entry_opt.is_none() {
|
if entry_opt.is_none() {
|
||||||
static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192];
|
//static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192];
|
||||||
return Ok(Bytes::from_static(&ZERO_PAGE));
|
//return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||||
/* return Err("could not find page image")?; */
|
return Err("could not find page image")?;
|
||||||
}
|
}
|
||||||
let (_key, entry) = entry_opt.unwrap();
|
let (_key, entry) = entry_opt.unwrap();
|
||||||
entry_rc = entry.clone();
|
entry_rc = entry.clone();
|
||||||
@@ -476,8 +599,10 @@ impl PageCache {
|
|||||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||||
assert!(oldentry.is_none());
|
assert!(oldentry.is_none());
|
||||||
|
|
||||||
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
|
debug!(
|
||||||
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
|
"inserted page image for {}/{}/{}_{} blk {} at {}",
|
||||||
|
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn
|
||||||
|
);
|
||||||
|
|
||||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@@ -550,6 +675,16 @@ impl PageCache {
|
|||||||
return shared.last_record_lsn;
|
return shared.last_record_lsn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_controldata(&self, c: controlfile::ControlFileDataZenith) {
|
||||||
|
let mut shared = self.shared.lock().unwrap();
|
||||||
|
shared.controldata = c;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_controldata(&self) -> controlfile::ControlFileDataZenith {
|
||||||
|
let shared = self.shared.lock().unwrap();
|
||||||
|
return shared.controldata.clone();
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Simple test function for the WAL redo code:
|
// Simple test function for the WAL redo code:
|
||||||
//
|
//
|
||||||
@@ -610,6 +745,7 @@ impl PageCache {
|
|||||||
*entry = to + 1;
|
*entry = to + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("relsize_inc {:?} to {}", rel, entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
|
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ use crate::page_cache;
|
|||||||
use crate::walreceiver;
|
use crate::walreceiver;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
|
|
||||||
|
use crate::controlfile;
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, io::Error>;
|
type Result<T> = std::result::Result<T, io::Error>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -51,7 +53,6 @@ enum BeMessage {
|
|||||||
RowDescription,
|
RowDescription,
|
||||||
DataRow,
|
DataRow,
|
||||||
CommandComplete,
|
CommandComplete,
|
||||||
ControlFile,
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// All that messages are actually CopyData from libpq point of view.
|
// All that messages are actually CopyData from libpq point of view.
|
||||||
@@ -339,18 +340,6 @@ impl Connection {
|
|||||||
self.stream.write_buf(&mut b).await?;
|
self.stream.write_buf(&mut b).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
BeMessage::ControlFile => {
|
|
||||||
// TODO pass checkpoint and xid info in this message
|
|
||||||
let mut b = Bytes::from("hello pg_control");
|
|
||||||
|
|
||||||
self.stream.write_u8(b'D').await?;
|
|
||||||
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
|
|
||||||
|
|
||||||
self.stream.write_i16(1).await?;
|
|
||||||
self.stream.write_i32(b.len() as i32).await?;
|
|
||||||
self.stream.write_buf(&mut b).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
BeMessage::CommandComplete => {
|
BeMessage::CommandComplete => {
|
||||||
let mut b = Bytes::from("SELECT 1\0");
|
let mut b = Bytes::from("SELECT 1\0");
|
||||||
|
|
||||||
@@ -438,8 +427,54 @@ impl Connection {
|
|||||||
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<()> {
|
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<()> {
|
||||||
trace!("got query {:?}", q.body);
|
trace!("got query {:?}", q.body);
|
||||||
|
|
||||||
if q.body.starts_with(b"controlfile") {
|
if q.body.starts_with(b"file") {
|
||||||
self.handle_controlfile().await
|
let (_l, r) = q.body.split_at("file ".len());
|
||||||
|
//TODO parse it correctly
|
||||||
|
let r = r.to_vec();
|
||||||
|
let str = String::from_utf8(r).unwrap().to_string();
|
||||||
|
|
||||||
|
let mut split = str.split(',');
|
||||||
|
let mut s;
|
||||||
|
|
||||||
|
let filepath = split.next().unwrap();
|
||||||
|
let sysid = {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u64>().unwrap()
|
||||||
|
};
|
||||||
|
|
||||||
|
let buf_tag = page_cache::BufferTag {
|
||||||
|
spcnode: {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u32>().unwrap()
|
||||||
|
},
|
||||||
|
dbnode: {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u32>().unwrap()
|
||||||
|
},
|
||||||
|
relnode: {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u32>().unwrap()
|
||||||
|
},
|
||||||
|
forknum: {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u8>().unwrap()
|
||||||
|
},
|
||||||
|
blknum: {
|
||||||
|
s = split.next().unwrap();
|
||||||
|
s.parse::<u32>().unwrap()
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
//TODO PARSE LSN
|
||||||
|
//let lsn = { s = split.next().unwrap(); s.parse::<u64>().unwrap()};
|
||||||
|
let lsn: u64 = 0;
|
||||||
|
info!(
|
||||||
|
"process file query sysid {} -- {:?} lsn {}",
|
||||||
|
sysid, buf_tag, lsn
|
||||||
|
);
|
||||||
|
|
||||||
|
self.handle_file(filepath.to_string(), sysid, buf_tag, lsn.into())
|
||||||
|
.await
|
||||||
} else if q.body.starts_with(b"pagestream ") {
|
} else if q.body.starts_with(b"pagestream ") {
|
||||||
let (_l, r) = q.body.split_at("pagestream ".len());
|
let (_l, r) = q.body.split_at("pagestream ".len());
|
||||||
let mut r = r.to_vec();
|
let mut r = r.to_vec();
|
||||||
@@ -486,10 +521,29 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_controlfile(&mut self) -> Result<()> {
|
async fn handle_file(
|
||||||
|
&mut self,
|
||||||
|
filepath: String,
|
||||||
|
sysid: u64,
|
||||||
|
buf_tag: page_cache::BufferTag,
|
||||||
|
lsn: u64,
|
||||||
|
) -> Result<()> {
|
||||||
|
let pcache = page_cache::get_pagecache(self.conf.clone(), sysid);
|
||||||
|
|
||||||
|
match pcache.get_page_at_lsn(buf_tag, lsn) {
|
||||||
|
Ok(p) => {
|
||||||
|
info!("info succeeded get_page_at_lsn: {}", lsn);
|
||||||
|
|
||||||
|
controlfile::write_buf_to_file(filepath, p, buf_tag.blknum);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("page not found and it's ok. get_page_at_lsn: {}", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
self.write_message_noflush(&BeMessage::RowDescription)
|
self.write_message_noflush(&BeMessage::RowDescription)
|
||||||
.await?;
|
.await?;
|
||||||
self.write_message_noflush(&BeMessage::ControlFile).await?;
|
self.write_message_noflush(&BeMessage::DataRow).await?;
|
||||||
self.write_message_noflush(&BeMessage::CommandComplete)
|
self.write_message_noflush(&BeMessage::CommandComplete)
|
||||||
.await?;
|
.await?;
|
||||||
self.write_message(&BeMessage::ReadyForQuery).await
|
self.write_message(&BeMessage::ReadyForQuery).await
|
||||||
@@ -558,6 +612,7 @@ impl Connection {
|
|||||||
|
|
||||||
let n_blocks = pcache.relsize_get(&tag);
|
let n_blocks = pcache.relsize_get(&tag);
|
||||||
|
|
||||||
|
trace!("ZenithNblocksRequest {:?} = {}", tag, n_blocks);
|
||||||
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
|
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
|
||||||
ok: true,
|
ok: true,
|
||||||
n_blocks: n_blocks,
|
n_blocks: n_blocks,
|
||||||
@@ -574,11 +629,26 @@ impl Connection {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
|
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
|
||||||
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
|
Ok(p) => {
|
||||||
ok: true,
|
let mut b = BytesMut::with_capacity(8192);
|
||||||
n_blocks: 0,
|
|
||||||
page: p,
|
trace!("ZenithReadResponse get_page_at_lsn succeed");
|
||||||
}),
|
if p.len() < 8192 {
|
||||||
|
//add padding
|
||||||
|
trace!("ZenithReadResponse add padding");
|
||||||
|
let padding: [u8; 8192 - 512] = [0; 8192 - 512];
|
||||||
|
b.extend_from_slice(&p);
|
||||||
|
b.extend_from_slice(&padding);
|
||||||
|
} else {
|
||||||
|
b.extend_from_slice(&p);
|
||||||
|
}
|
||||||
|
|
||||||
|
BeMessage::ZenithReadResponse(ZenithReadResponse {
|
||||||
|
ok: true,
|
||||||
|
n_blocks: 0,
|
||||||
|
page: b.freeze(),
|
||||||
|
})
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
||||||
error!("get_page_at_lsn: {}", e);
|
error!("get_page_at_lsn: {}", e);
|
||||||
@@ -599,6 +669,7 @@ impl Connection {
|
|||||||
relnode: req.relnode,
|
relnode: req.relnode,
|
||||||
forknum: req.forknum,
|
forknum: req.forknum,
|
||||||
};
|
};
|
||||||
|
trace!("ZenithCreateRequest {:?}", tag);
|
||||||
|
|
||||||
pcache.relsize_inc(&tag, None);
|
pcache.relsize_inc(&tag, None);
|
||||||
|
|
||||||
@@ -616,6 +687,8 @@ impl Connection {
|
|||||||
forknum: req.forknum,
|
forknum: req.forknum,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace!("ZenithExtendRequest {:?} to {}", tag, req.blkno);
|
||||||
|
|
||||||
pcache.relsize_inc(&tag, Some(req.blkno));
|
pcache.relsize_inc(&tag, Some(req.blkno));
|
||||||
|
|
||||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||||
|
|||||||
11
pageserver/src/pg_constants.rs
Normal file
11
pageserver/src/pg_constants.rs
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
// From pg_tablespace_d.h
|
||||||
|
//
|
||||||
|
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||||
|
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||||
|
//Special values for non-rel files' tags
|
||||||
|
//TODO maybe use enum?
|
||||||
|
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
|
||||||
|
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
|
||||||
|
pub const PG_XACT_FORKNUM: u32 = 44;
|
||||||
|
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
|
||||||
|
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
|
||||||
@@ -22,7 +22,7 @@ use tokio::runtime;
|
|||||||
|
|
||||||
use futures::future;
|
use futures::future;
|
||||||
|
|
||||||
use crate::{page_cache, PageServerConf};
|
use crate::{controlfile, page_cache, pg_constants, PageServerConf};
|
||||||
|
|
||||||
struct Storage {
|
struct Storage {
|
||||||
region: Region,
|
region: Region,
|
||||||
@@ -84,8 +84,24 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
|||||||
.list("relationdata/".to_string(), Some("".to_string()))
|
.list("relationdata/".to_string(), Some("".to_string()))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: get that from backup
|
//Before uploading other files, slurp pg_control to set systemid
|
||||||
let sys_id: u64 = 42;
|
|
||||||
|
let control_results: Vec<s3::serde_types::ListBucketResult> = bucket
|
||||||
|
.list(
|
||||||
|
"relationdata/global/pg_control".to_string(),
|
||||||
|
Some("".to_string()),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let object = &(&control_results[0]).contents[0];
|
||||||
|
let (data, _) = bucket.get_object(&object.key).await.unwrap();
|
||||||
|
let bytes = BytesMut::from(data.as_slice()).freeze();
|
||||||
|
let c = controlfile::decode_pg_control(bytes);
|
||||||
|
|
||||||
|
let pcache = page_cache::get_pagecache(conf.clone(), c.system_identifier);
|
||||||
|
pcache.set_controldata(c.clone());
|
||||||
|
trace!("uploaded controlfile {:?}", pcache.get_controldata());
|
||||||
|
|
||||||
|
let sys_id: u64 = c.system_identifier;
|
||||||
let mut oldest_lsn = 0;
|
let mut oldest_lsn = 0;
|
||||||
let mut slurp_futures: Vec<_> = Vec::new();
|
let mut slurp_futures: Vec<_> = Vec::new();
|
||||||
|
|
||||||
@@ -119,23 +135,47 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
|||||||
panic!("no base backup found");
|
panic!("no base backup found");
|
||||||
}
|
}
|
||||||
|
|
||||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
//Now add nonrelation files
|
||||||
|
let nonrelresults: Vec<s3::serde_types::ListBucketResult> = bucket
|
||||||
|
.list("nonreldata/".to_string(), Some("".to_string()))
|
||||||
|
.await?;
|
||||||
|
for result in nonrelresults {
|
||||||
|
for object in result.contents {
|
||||||
|
// Download needed non relation files, slurping them into memory
|
||||||
|
|
||||||
|
let key = object.key;
|
||||||
|
let relpath = key.strip_prefix("nonreldata/").unwrap();
|
||||||
|
trace!("list nonrelfiles {}", relpath);
|
||||||
|
|
||||||
|
let parsed = parse_nonrel_file_path(&relpath);
|
||||||
|
|
||||||
|
match parsed {
|
||||||
|
Ok(p) => {
|
||||||
|
let b = bucket.clone();
|
||||||
|
let f = slurp_base_file(conf, sys_id, b, key.to_string(), p);
|
||||||
|
|
||||||
|
slurp_futures.push(f);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("unrecognized file: {} ({})", relpath, e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pcache.init_valid_lsn(oldest_lsn);
|
pcache.init_valid_lsn(oldest_lsn);
|
||||||
|
|
||||||
info!("{} files to restore...", slurp_futures.len());
|
info!("{} files to restore...", slurp_futures.len());
|
||||||
|
|
||||||
future::join_all(slurp_futures).await;
|
future::join_all(slurp_futures).await;
|
||||||
info!("restored!");
|
info!(
|
||||||
|
"restored! {:?} to {:?}",
|
||||||
|
pcache.first_valid_lsn, pcache.last_valid_lsn
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// From pg_tablespace_d.h
|
|
||||||
//
|
|
||||||
// FIXME: we'll probably need these elsewhere too, move to some common location
|
|
||||||
const DEFAULTTABLESPACE_OID: u32 = 1663;
|
|
||||||
const GLOBALTABLESPACE_OID: u32 = 1664;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct FilePathError {
|
struct FilePathError {
|
||||||
msg: String,
|
msg: String,
|
||||||
@@ -185,6 +225,17 @@ struct ParsedBaseImageFileName {
|
|||||||
pub lsn: u64,
|
pub lsn: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_lsn_from_filename(fname: &str) -> Result<u64, FilePathError> {
|
||||||
|
let (_, lsn_str) = fname.split_at(fname.len() - 16);
|
||||||
|
|
||||||
|
let (lsnhi, lsnlo) = lsn_str.split_at(8);
|
||||||
|
let lsn_hi = u64::from_str_radix(lsnhi, 16)?;
|
||||||
|
let lsn_lo = u64::from_str_radix(lsnlo, 16)?;
|
||||||
|
let lsn = lsn_hi << 32 | lsn_lo;
|
||||||
|
|
||||||
|
return Ok(lsn);
|
||||||
|
}
|
||||||
|
|
||||||
// formats:
|
// formats:
|
||||||
// <oid>
|
// <oid>
|
||||||
// <oid>_<fork name>
|
// <oid>_<fork name>
|
||||||
@@ -223,6 +274,46 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
|||||||
return Ok((relnode, forknum, segno, lsn));
|
return Ok((relnode, forknum, segno, lsn));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_nonrel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
||||||
|
//TODO parse segno from xact filenames too
|
||||||
|
if let Some(fname) = path.strip_prefix("pg_xact/") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_XACT_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
} else if let Some(fname) = path.strip_prefix("pg_multixact/offsets") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
} else if let Some(fname) = path.strip_prefix("pg_multixact/members") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
return Err(FilePathError::new("invalid non relation data file name"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
||||||
/*
|
/*
|
||||||
* Relation data files can be in one of the following directories:
|
* Relation data files can be in one of the following directories:
|
||||||
@@ -242,10 +333,36 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
|||||||
* <oid>.<segment number>
|
* <oid>.<segment number>
|
||||||
*/
|
*/
|
||||||
if let Some(fname) = path.strip_prefix("global/") {
|
if let Some(fname) = path.strip_prefix("global/") {
|
||||||
|
if fname.contains("pg_control") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_CONTROLFILE_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if fname.contains("pg_filenode") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||||
|
|
||||||
return Ok(ParsedBaseImageFileName {
|
return Ok(ParsedBaseImageFileName {
|
||||||
spcnode: GLOBALTABLESPACE_OID,
|
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||||
dbnode: 0,
|
dbnode: 0,
|
||||||
relnode,
|
relnode,
|
||||||
forknum,
|
forknum,
|
||||||
@@ -265,10 +382,23 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
|||||||
return Err(FilePathError::new("invalid relation data file name"));
|
return Err(FilePathError::new("invalid relation data file name"));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if fname.contains("pg_filenode") {
|
||||||
|
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||||
|
|
||||||
|
return Ok(ParsedBaseImageFileName {
|
||||||
|
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
|
||||||
|
dbnode,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
segno: 0,
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||||
|
|
||||||
return Ok(ParsedBaseImageFileName {
|
return Ok(ParsedBaseImageFileName {
|
||||||
spcnode: DEFAULTTABLESPACE_OID,
|
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
|
||||||
dbnode,
|
dbnode,
|
||||||
relnode,
|
relnode,
|
||||||
forknum,
|
forknum,
|
||||||
@@ -302,22 +432,55 @@ async fn slurp_base_file(
|
|||||||
|
|
||||||
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
||||||
|
|
||||||
// FIXME: use constants (BLCKSZ)
|
|
||||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
|
||||||
|
|
||||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||||
|
|
||||||
while bytes.remaining() >= 8192 {
|
// pg_filenode.map has non-standard size - 512 bytes
|
||||||
let tag = page_cache::BufferTag {
|
if parsed.forknum == pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||||
|
let b = bytes.clone();
|
||||||
|
controlfile::decode_filemapping(b);
|
||||||
|
while bytes.remaining() >= 512 {
|
||||||
|
let tag = page_cache::BufferTag {
|
||||||
|
spcnode: parsed.spcnode,
|
||||||
|
dbnode: parsed.dbnode,
|
||||||
|
relnode: parsed.relnode,
|
||||||
|
forknum: parsed.forknum as u8,
|
||||||
|
blknum: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(512));
|
||||||
|
}
|
||||||
|
|
||||||
|
let tag = page_cache::RelTag {
|
||||||
spcnode: parsed.spcnode,
|
spcnode: parsed.spcnode,
|
||||||
dbnode: parsed.dbnode,
|
dbnode: parsed.dbnode,
|
||||||
relnode: parsed.relnode,
|
relnode: parsed.relnode,
|
||||||
forknum: parsed.forknum as u8,
|
forknum: parsed.forknum as u8,
|
||||||
blknum: blknum,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
|
pcache.relsize_inc(&tag, Some(0));
|
||||||
|
} else {
|
||||||
|
// FIXME: use constants (BLCKSZ)
|
||||||
|
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||||
|
let reltag = page_cache::RelTag {
|
||||||
|
spcnode: parsed.spcnode,
|
||||||
|
dbnode: parsed.dbnode,
|
||||||
|
relnode: parsed.relnode,
|
||||||
|
forknum: parsed.forknum as u8,
|
||||||
|
};
|
||||||
|
|
||||||
blknum += 1;
|
while bytes.remaining() >= 8192 {
|
||||||
|
let tag = page_cache::BufferTag {
|
||||||
|
spcnode: parsed.spcnode,
|
||||||
|
dbnode: parsed.dbnode,
|
||||||
|
relnode: parsed.relnode,
|
||||||
|
forknum: parsed.forknum as u8,
|
||||||
|
blknum: blknum,
|
||||||
|
};
|
||||||
|
|
||||||
|
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
|
||||||
|
pcache.relsize_inc(&reltag, Some(blknum));
|
||||||
|
|
||||||
|
blknum += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,8 @@
|
|||||||
//#![allow(dead_code)]
|
//#![allow(dead_code)]
|
||||||
//include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
//include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
||||||
|
|
||||||
|
use crate::pg_constants;
|
||||||
|
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
|
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
@@ -234,6 +236,8 @@ const BLCKSZ: u16 = 8192;
|
|||||||
//
|
//
|
||||||
// Constants from xlogrecord.h
|
// Constants from xlogrecord.h
|
||||||
//
|
//
|
||||||
|
const XLR_INFO_MASK: u8 = 0x0F;
|
||||||
|
|
||||||
const XLR_MAX_BLOCK_ID: u8 = 32;
|
const XLR_MAX_BLOCK_ID: u8 = 32;
|
||||||
|
|
||||||
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
|
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
|
||||||
@@ -253,6 +257,12 @@ const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
|
|||||||
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
|
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
|
||||||
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
|
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
|
||||||
|
|
||||||
|
//
|
||||||
|
// constants from clog.h
|
||||||
|
//
|
||||||
|
const CLOG_XACTS_PER_BYTE: u32 = 4;
|
||||||
|
const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
|
||||||
|
|
||||||
pub struct DecodedBkpBlock {
|
pub struct DecodedBkpBlock {
|
||||||
/* Is this block ref in use? */
|
/* Is this block ref in use? */
|
||||||
//in_use: bool,
|
//in_use: bool,
|
||||||
@@ -261,7 +271,7 @@ pub struct DecodedBkpBlock {
|
|||||||
pub rnode_spcnode: u32,
|
pub rnode_spcnode: u32,
|
||||||
pub rnode_dbnode: u32,
|
pub rnode_dbnode: u32,
|
||||||
pub rnode_relnode: u32,
|
pub rnode_relnode: u32,
|
||||||
pub forknum: u8,
|
pub forknum: u8, // Note that we have a few special forknum values for non-rel files. Handle them too
|
||||||
pub blkno: u32,
|
pub blkno: u32,
|
||||||
|
|
||||||
/* copy of the fork_flags field from the XLogRecordBlockHeader */
|
/* copy of the fork_flags field from the XLogRecordBlockHeader */
|
||||||
@@ -297,6 +307,26 @@ pub struct DecodedWALRecord {
|
|||||||
const XLOG_SWITCH: u8 = 0x40;
|
const XLOG_SWITCH: u8 = 0x40;
|
||||||
const RM_XLOG_ID: u8 = 0;
|
const RM_XLOG_ID: u8 = 0;
|
||||||
|
|
||||||
|
const RM_XACT_ID: u8 = 1;
|
||||||
|
// const RM_CLOG_ID:u8 = 3;
|
||||||
|
//const RM_MULTIXACT_ID:u8 = 6;
|
||||||
|
|
||||||
|
// from xact.h
|
||||||
|
const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||||
|
// const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||||
|
// const XLOG_XACT_ABORT: u8 = 0x20;
|
||||||
|
const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||||
|
// const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||||
|
// const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
|
||||||
|
// const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
|
||||||
|
/* free opcode 0x70 */
|
||||||
|
|
||||||
|
/* mask for filtering opcodes out of xl_info */
|
||||||
|
const XLOG_XACT_OPMASK: u8 = 0x70;
|
||||||
|
|
||||||
|
/* does this record have a 'xinfo' field or not */
|
||||||
|
// const XLOG_XACT_HAS_INFO: u8 = 0x80;
|
||||||
|
|
||||||
// Is this record an XLOG_SWITCH record? They need some special processing,
|
// Is this record an XLOG_SWITCH record? They need some special processing,
|
||||||
// so we need to check for that before the rest of the parsing.
|
// so we need to check for that before the rest of the parsing.
|
||||||
//
|
//
|
||||||
@@ -331,13 +361,17 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
|||||||
|
|
||||||
// FIXME: assume little-endian here
|
// FIXME: assume little-endian here
|
||||||
let xl_tot_len = buf.get_u32_le();
|
let xl_tot_len = buf.get_u32_le();
|
||||||
let _xl_xid = buf.get_u32_le();
|
let xl_xid = buf.get_u32_le();
|
||||||
let _xl_prev = buf.get_u64_le();
|
let _xl_prev = buf.get_u64_le();
|
||||||
let _xl_info = buf.get_u8();
|
let xl_info = buf.get_u8();
|
||||||
let _xl_rmid = buf.get_u8();
|
let xl_rmid = buf.get_u8();
|
||||||
buf.advance(2); // 2 bytes of padding
|
buf.advance(2); // 2 bytes of padding
|
||||||
let _xl_crc = buf.get_u32_le();
|
let _xl_crc = buf.get_u32_le();
|
||||||
|
|
||||||
|
info!("decode_wal_record xl_rmid = {}", xl_rmid);
|
||||||
|
|
||||||
|
let rminfo: u8 = xl_info & !XLR_INFO_MASK;
|
||||||
|
|
||||||
let remaining = xl_tot_len - SizeOfXLogRecord;
|
let remaining = xl_tot_len - SizeOfXLogRecord;
|
||||||
|
|
||||||
if buf.remaining() != remaining as usize {
|
if buf.remaining() != remaining as usize {
|
||||||
@@ -349,6 +383,53 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
|||||||
let mut rnode_relnode: u32 = 0;
|
let mut rnode_relnode: u32 = 0;
|
||||||
let mut got_rnode = false;
|
let mut got_rnode = false;
|
||||||
|
|
||||||
|
if xl_rmid == RM_XACT_ID
|
||||||
|
&& ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT
|
||||||
|
|| (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED)
|
||||||
|
{
|
||||||
|
info!("decode_wal_record RM_XACT_ID - XLOG_XACT_COMMIT");
|
||||||
|
|
||||||
|
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
|
||||||
|
|
||||||
|
let blkno = xl_xid / CLOG_XACTS_PER_PAGE;
|
||||||
|
|
||||||
|
let mut blk = DecodedBkpBlock {
|
||||||
|
rnode_spcnode: 0,
|
||||||
|
rnode_dbnode: 0,
|
||||||
|
rnode_relnode: 0,
|
||||||
|
forknum: pg_constants::PG_XACT_FORKNUM as u8,
|
||||||
|
blkno: blkno,
|
||||||
|
|
||||||
|
flags: 0,
|
||||||
|
has_image: false,
|
||||||
|
apply_image: false,
|
||||||
|
will_init: false,
|
||||||
|
hole_offset: 0,
|
||||||
|
hole_length: 0,
|
||||||
|
bimg_len: 0,
|
||||||
|
bimg_info: 0,
|
||||||
|
|
||||||
|
has_data: true,
|
||||||
|
data_len: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let fork_flags = buf.get_u8();
|
||||||
|
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
|
||||||
|
blk.data_len = buf.get_u16_le();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"decode_wal_record RM_XACT_ID blk has data with data_len {}",
|
||||||
|
blk.data_len
|
||||||
|
);
|
||||||
|
|
||||||
|
blocks.push(blk);
|
||||||
|
return DecodedWALRecord {
|
||||||
|
lsn: lsn,
|
||||||
|
record: rec,
|
||||||
|
blocks: blocks,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// Decode the headers
|
// Decode the headers
|
||||||
|
|
||||||
let mut max_block_id = 0;
|
let mut max_block_id = 0;
|
||||||
@@ -552,6 +633,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
|||||||
|
|
||||||
//blk->rnode = *rnode;
|
//blk->rnode = *rnode;
|
||||||
}
|
}
|
||||||
|
|
||||||
blk.rnode_spcnode = rnode_spcnode;
|
blk.rnode_spcnode = rnode_spcnode;
|
||||||
blk.rnode_dbnode = rnode_dbnode;
|
blk.rnode_dbnode = rnode_dbnode;
|
||||||
blk.rnode_relnode = rnode_relnode;
|
blk.rnode_relnode = rnode_relnode;
|
||||||
|
|||||||
@@ -160,9 +160,11 @@ impl WalRedoProcess {
|
|||||||
.expect("failed to execute initdb");
|
.expect("failed to execute initdb");
|
||||||
|
|
||||||
if !initdb.status.success() {
|
if !initdb.status.success() {
|
||||||
panic!("initdb failed: {}\nstderr:\n{}",
|
panic!(
|
||||||
std::str::from_utf8(&initdb.stdout).unwrap(),
|
"initdb failed: {}\nstderr:\n{}",
|
||||||
std::str::from_utf8(&initdb.stderr).unwrap());
|
std::str::from_utf8(&initdb.stdout).unwrap(),
|
||||||
|
std::str::from_utf8(&initdb.stderr).unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start postgres itself
|
// Start postgres itself
|
||||||
|
|||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: b1f5a5ec14...a71b5c24eb
Reference in New Issue
Block a user