mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 10:52:55 +00:00
Compare commits
3 Commits
docker-mul
...
compute_no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a267dfa41f | ||
|
|
1b9eb9430c | ||
|
|
9a4fbf365c |
@@ -21,6 +21,8 @@ use std::{
|
||||
use lazy_static::lazy_static;
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
use postgres;
|
||||
|
||||
lazy_static! {
|
||||
// postgres would be there if it was build by 'make postgres' here in the repo
|
||||
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
@@ -57,7 +59,7 @@ pub struct StorageControlPlane {
|
||||
|
||||
impl StorageControlPlane {
|
||||
// postgres <-> page_server
|
||||
pub fn one_page_server() -> StorageControlPlane {
|
||||
pub fn one_page_server(froms3: bool) -> StorageControlPlane {
|
||||
let mut cplane = StorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
page_servers: Vec::new(),
|
||||
@@ -68,7 +70,11 @@ impl StorageControlPlane {
|
||||
data_dir: TEST_WORKDIR.join("pageserver"),
|
||||
};
|
||||
pserver.init();
|
||||
pserver.start();
|
||||
if froms3 {
|
||||
pserver.start_froms3();
|
||||
} else {
|
||||
pserver.start();
|
||||
}
|
||||
|
||||
cplane.page_servers.push(pserver);
|
||||
cplane
|
||||
@@ -186,6 +192,27 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_froms3(&self) {
|
||||
println!("Starting pageserver at '{}'", self.page_service_addr);
|
||||
|
||||
let status = Command::new(BIN_DIR.join("pageserver"))
|
||||
.args(&["-D", self.data_dir.to_str().unwrap()])
|
||||
.args(&["-l", self.page_service_addr.to_string().as_str()])
|
||||
.arg("-d")
|
||||
.env_clear()
|
||||
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
|
||||
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||
.env("S3_REGION", "us-east-1")
|
||||
.env("S3_ACCESSKEY", "minioadmin")
|
||||
.env("S3_SECRET", "minioadmin")
|
||||
.status()
|
||||
.expect("failed to start pageserver");
|
||||
|
||||
if !status.success() {
|
||||
panic!("pageserver start failed");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
let pidfile = self.data_dir.join("pageserver.pid");
|
||||
let pid = fs::read_to_string(pidfile).unwrap();
|
||||
@@ -352,7 +379,7 @@ impl ComputeControlPlane<'_> {
|
||||
// Init compute node without files, only datadir structure
|
||||
// use initdb --compute-node flag and GUC 'computenode_mode'
|
||||
// to distinguish the node
|
||||
pub fn new_minimal_node(&mut self) -> &PostgresNode {
|
||||
pub fn new_minimal_node<'a>(&mut self) -> &Arc<PostgresNode> {
|
||||
// allocate new node entry with generated port
|
||||
let node_id = self.nodes.len() + 1;
|
||||
let node = PostgresNode {
|
||||
@@ -365,7 +392,7 @@ impl ComputeControlPlane<'_> {
|
||||
self.nodes.push(Arc::new(node));
|
||||
let node = self.nodes.last().unwrap();
|
||||
|
||||
// initialize data directory w/o files
|
||||
// initialize data directory
|
||||
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
|
||||
let initdb_path = self.pg_bin_dir.join("initdb");
|
||||
println!("initdb_path: {}", initdb_path.to_str().unwrap());
|
||||
@@ -383,6 +410,11 @@ impl ComputeControlPlane<'_> {
|
||||
panic!("initdb failed");
|
||||
}
|
||||
|
||||
// // allow local replication connections
|
||||
// node.append_conf("pg_hba.conf", format!("\
|
||||
// host replication all {}/32 sspi include_realm=1 map=regress\n\
|
||||
// ", node.ip).as_str());
|
||||
|
||||
// listen for selected port
|
||||
node.append_conf(
|
||||
"postgresql.conf",
|
||||
@@ -397,16 +429,37 @@ impl ComputeControlPlane<'_> {
|
||||
listen_addresses = '{address}'\n\
|
||||
port = {port}\n\
|
||||
computenode_mode = true\n\
|
||||
",
|
||||
",
|
||||
address = node.ip,
|
||||
port = node.port
|
||||
)
|
||||
.as_str(),
|
||||
);
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
pub fn new_node_wo_data(&mut self) -> Arc<PostgresNode> {
|
||||
let storage_cplane = self.storage_cplane;
|
||||
let node = self.new_minimal_node();
|
||||
|
||||
let pserver = storage_cplane.page_server_addr();
|
||||
|
||||
// Configure that node to take pages from pageserver
|
||||
node.append_conf(
|
||||
"postgresql.conf",
|
||||
format!(
|
||||
"\
|
||||
page_server_connstring = 'host={} port={}'\n\
|
||||
",
|
||||
pserver.ip(),
|
||||
pserver.port()
|
||||
)
|
||||
.as_str(),
|
||||
);
|
||||
|
||||
node.clone()
|
||||
}
|
||||
|
||||
pub fn new_node(&mut self) -> Arc<PostgresNode> {
|
||||
let storage_cplane = self.storage_cplane;
|
||||
let node = self.new_vanilla_node();
|
||||
@@ -575,27 +628,133 @@ impl PostgresNode {
|
||||
self.pgdata.to_str()
|
||||
}
|
||||
|
||||
/* Create stub controlfile and respective xlog to start computenode */
|
||||
pub fn setup_controlfile(&self) {
|
||||
let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap());
|
||||
// Request from pageserver stub controlfile, respective xlog
|
||||
// and a bunch of files needed to start computenode
|
||||
//
|
||||
// NOTE this "file" request is a crutch.
|
||||
// It asks pageserver to write requested page to the provided filepath
|
||||
// and thus only works locally.
|
||||
// TODO receive pages via some libpq protocol.
|
||||
// The problem I've met is that nonrelfiles are not valid utf8 and cannot be
|
||||
// handled by simple_query(). that expects test.
|
||||
// And reqular query() uses prepared queries.
|
||||
|
||||
{
|
||||
File::create(filepath).unwrap();
|
||||
// TODO pass sysid as parameter
|
||||
pub fn setup_compute_node(&self, sysid: u64, storage_cplane: &StorageControlPlane) {
|
||||
let mut query;
|
||||
//Request pg_control from pageserver
|
||||
query = format!(
|
||||
"file {}/global/pg_control,{},{},{},{},{},{},{}",
|
||||
self.pgdata.to_str().unwrap(),
|
||||
sysid as u64, //sysid
|
||||
1664, //tablespace
|
||||
0, //dboid
|
||||
0, //reloid
|
||||
42, //forknum pg_control
|
||||
0, //blkno
|
||||
0 //lsn
|
||||
);
|
||||
storage_cplane.page_server_psql(query.as_str());
|
||||
|
||||
//Request pg_xact and pg_multixact from pageserver
|
||||
//We need them for initial pageserver startup and authentication
|
||||
//TODO figure out which block number we really need
|
||||
query = format!(
|
||||
"file {}/pg_xact/0000,{},{},{},{},{},{},{}",
|
||||
self.pgdata.to_str().unwrap(),
|
||||
sysid as u64, //sysid
|
||||
0, //tablespace
|
||||
0, //dboid
|
||||
0, //reloid
|
||||
44, //forknum
|
||||
0, //blkno
|
||||
0 //lsn
|
||||
);
|
||||
storage_cplane.page_server_psql(query.as_str());
|
||||
|
||||
query = format!(
|
||||
"file {}/pg_multixact/offsets/0000,{},{},{},{},{},{},{}",
|
||||
self.pgdata.to_str().unwrap(),
|
||||
sysid as u64, //sysid
|
||||
0, //tablespace
|
||||
0, //dboid
|
||||
0, //reloid
|
||||
45, //forknum
|
||||
0, //blkno
|
||||
0 //lsn
|
||||
);
|
||||
storage_cplane.page_server_psql(query.as_str());
|
||||
|
||||
query = format!(
|
||||
"file {}/pg_multixact/members/0000,{},{},{},{},{},{},{}",
|
||||
self.pgdata.to_str().unwrap(),
|
||||
sysid as u64, //sysid
|
||||
0, //tablespace
|
||||
0, //dboid
|
||||
0, //reloid
|
||||
46, //forknum
|
||||
0, //blkno
|
||||
0 //lsn
|
||||
);
|
||||
storage_cplane.page_server_psql(query.as_str());
|
||||
|
||||
//Request a few shared catalogs needed for authentication
|
||||
//Without them we cannot setup connection with pageserver to request further pages
|
||||
let reloids = [1260, 1261, 1262, 2396];
|
||||
for reloid in reloids.iter() {
|
||||
//FIXME request all blocks from file, not just 10
|
||||
for blkno in 0..10 {
|
||||
query = format!(
|
||||
"file {}/global/{},{},{},{},{},{},{},{}",
|
||||
self.pgdata.to_str().unwrap(),
|
||||
reloid, //suse it as filename
|
||||
sysid as u64, //sysid
|
||||
1664, //tablespace
|
||||
0, //dboid
|
||||
reloid, //reloid
|
||||
0, //forknum
|
||||
blkno, //blkno
|
||||
0 //lsn
|
||||
);
|
||||
storage_cplane.page_server_psql(query.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
fs::create_dir(format!("{}/base/13006", self.pgdata.to_str().unwrap())).unwrap();
|
||||
fs::create_dir(format!("{}/base/13007", self.pgdata.to_str().unwrap())).unwrap();
|
||||
|
||||
//FIXME figure out what wal file we need to successfully start
|
||||
let walfilepath = format!(
|
||||
"{}/pg_wal/000000010000000000000001",
|
||||
self.pgdata.to_str().unwrap()
|
||||
);
|
||||
fs::copy(
|
||||
"/home/anastasia/zenith/zenith/tmp_check/pgdata/pg_wal/000000010000000000000001",
|
||||
walfilepath,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
println!("before resetwal ");
|
||||
|
||||
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
|
||||
|
||||
// Now it does nothing, just prints existing content of pg_control.
|
||||
// TODO update values with most recent lsn, xid, oid requested from pageserver
|
||||
let pg_resetwal = Command::new(pg_resetwal_path)
|
||||
.args(&["-D", self.pgdata.to_str().unwrap()])
|
||||
.arg("-f")
|
||||
// TODO probably we will have to modify pg_resetwal
|
||||
// .arg("--compute-node")
|
||||
.arg("-n") //dry run
|
||||
//.arg("-f")
|
||||
//.args(&["--next-transaction-id", "100500"])
|
||||
//.args(&["--next-oid", "17000"])
|
||||
//.args(&["--next-transaction-id", "100500"])
|
||||
.status()
|
||||
.expect("failed to execute pg_resetwal");
|
||||
|
||||
if !pg_resetwal.success() {
|
||||
panic!("pg_resetwal failed");
|
||||
}
|
||||
|
||||
println!("setup done");
|
||||
}
|
||||
|
||||
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
|
||||
@@ -613,6 +772,28 @@ impl PostgresNode {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_to_s3(&self) {
|
||||
println!("Push to s3 node at '{}'", self.pgdata.to_str().unwrap());
|
||||
|
||||
let zenith_push_path = self.pg_bin_dir.join("zenith_push");
|
||||
println!("zenith_push_path: {}", zenith_push_path.to_str().unwrap());
|
||||
|
||||
let status = Command::new(zenith_push_path)
|
||||
.args(&["-D", self.pgdata.to_str().unwrap()])
|
||||
.env_clear()
|
||||
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||
.env("S3_REGION", "us-east-1")
|
||||
.env("S3_ACCESSKEY", "minioadmin")
|
||||
.env("S3_SECRET", "minioadmin")
|
||||
// .env("S3_BUCKET", "zenith-testbucket")
|
||||
.status()
|
||||
.expect("failed to push node to s3");
|
||||
|
||||
if !status.success() {
|
||||
panic!("zenith_push failed");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO
|
||||
pub fn pg_bench() {}
|
||||
pub fn pg_regress() {}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#[allow(dead_code)]
|
||||
mod control_plane;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use control_plane::ComputeControlPlane;
|
||||
use control_plane::StorageControlPlane;
|
||||
@@ -8,17 +10,21 @@ use control_plane::StorageControlPlane;
|
||||
// -- restart + seqscan won't read deleted stuff
|
||||
// -- pageserver api endpoint to check all rels
|
||||
|
||||
// Handcrafted cases with wal records that are (were) problematic for redo.
|
||||
//Handcrafted cases with wal records that are (were) problematic for redo.
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_redo_cases() {
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = StorageControlPlane::one_page_server();
|
||||
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||
|
||||
// start postgres
|
||||
let node = compute_cplane.new_node();
|
||||
node.start(&storage_cplane);
|
||||
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
// check basic work with table
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
@@ -26,7 +32,7 @@ fn test_redo_cases() {
|
||||
);
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
||||
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||
);
|
||||
let count: i64 = node
|
||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||
@@ -34,9 +40,9 @@ fn test_redo_cases() {
|
||||
.unwrap()
|
||||
.get(0);
|
||||
println!("sum = {}", count);
|
||||
assert_eq!(count, 5000050000);
|
||||
assert_eq!(count, 5050);
|
||||
|
||||
// check 'create table as'
|
||||
//check 'create table as'
|
||||
node.safe_psql("postgres", "CREATE TABLE t2 AS SELECT * FROM t");
|
||||
let count: i64 = node
|
||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||
@@ -44,28 +50,33 @@ fn test_redo_cases() {
|
||||
.unwrap()
|
||||
.get(0);
|
||||
println!("sum = {}", count);
|
||||
assert_eq!(count, 5000050000);
|
||||
assert_eq!(count, 5050);
|
||||
}
|
||||
|
||||
// Runs pg_regress on a compute node
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_regress() {
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = StorageControlPlane::one_page_server();
|
||||
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||
|
||||
// start postgres
|
||||
let node = compute_cplane.new_node();
|
||||
node.start(&storage_cplane);
|
||||
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
control_plane::regress_check(&node);
|
||||
}
|
||||
|
||||
// Run two postgres instances on one pageserver
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_pageserver_multitenancy() {
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = StorageControlPlane::one_page_server();
|
||||
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||
|
||||
// Allocate postgres instance, but don't start
|
||||
@@ -74,6 +85,11 @@ fn test_pageserver_multitenancy() {
|
||||
node1.start(&storage_cplane);
|
||||
node2.start(&storage_cplane);
|
||||
|
||||
// XXX: add some extension func to postgres to check walsender conn
|
||||
// XXX: or better just drop that
|
||||
println!("await pageserver connection...");
|
||||
sleep(Duration::from_secs(3));
|
||||
|
||||
// check node1
|
||||
node1.safe_psql(
|
||||
"postgres",
|
||||
@@ -81,7 +97,7 @@ fn test_pageserver_multitenancy() {
|
||||
);
|
||||
node1.safe_psql(
|
||||
"postgres",
|
||||
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
|
||||
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||
);
|
||||
let count: i64 = node1
|
||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||
@@ -89,7 +105,7 @@ fn test_pageserver_multitenancy() {
|
||||
.unwrap()
|
||||
.get(0);
|
||||
println!("sum = {}", count);
|
||||
assert_eq!(count, 5000050000);
|
||||
assert_eq!(count, 5050);
|
||||
|
||||
// check node2
|
||||
node2.safe_psql(
|
||||
@@ -98,7 +114,7 @@ fn test_pageserver_multitenancy() {
|
||||
);
|
||||
node2.safe_psql(
|
||||
"postgres",
|
||||
"INSERT INTO t SELECT generate_series(100000,200000), 'payload'",
|
||||
"INSERT INTO t SELECT generate_series(100,200), 'payload'",
|
||||
);
|
||||
let count: i64 = node2
|
||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||
@@ -106,5 +122,89 @@ fn test_pageserver_multitenancy() {
|
||||
.unwrap()
|
||||
.get(0);
|
||||
println!("sum = {}", count);
|
||||
assert_eq!(count, 15000150000);
|
||||
assert_eq!(count, 15150);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
// Start pageserver using s3 base image
|
||||
//
|
||||
// Requires working minio with hardcoded setup:
|
||||
// .env("S3_ENDPOINT", "https://127.0.0.1:9000")
|
||||
// .env("S3_REGION", "us-east-1")
|
||||
// .env("S3_ACCESSKEY", "minioadmin")
|
||||
// .env("S3_SECRET", "minioadmin")
|
||||
// .env("S3_BUCKET", "zenith-testbucket")
|
||||
// TODO use env variables in test
|
||||
fn test_pageserver_recovery() {
|
||||
//This test expects that image is already uploaded to s3
|
||||
//To upload it use zenith_push before test (see node.push_to_s3() for details)
|
||||
let storage_cplane = StorageControlPlane::one_page_server(true);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||
|
||||
//Wait while daemon uploads pages from s3
|
||||
sleep(Duration::from_secs(15));
|
||||
|
||||
let node_restored = compute_cplane.new_node_wo_data();
|
||||
|
||||
//TODO 6947041219207877724 is a hardcoded sysid for my cluster. Get it somewhere
|
||||
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
|
||||
|
||||
node_restored.start(&storage_cplane);
|
||||
|
||||
let rows = node_restored.safe_psql("postgres", "SELECT relname from pg_class;");
|
||||
|
||||
assert_eq!(rows.len(), 395);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
//Scenario for future test. Not implemented yet
|
||||
fn test_pageserver_node_switch() {
|
||||
//Create pageserver
|
||||
let storage_cplane = StorageControlPlane::one_page_server(false);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
|
||||
|
||||
//Create reqular node
|
||||
let node = compute_cplane.new_node();
|
||||
node.start(&storage_cplane);
|
||||
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"CREATE TABLE t(key int primary key, value text)",
|
||||
);
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
|
||||
);
|
||||
let count: i64 = node
|
||||
.safe_psql("postgres", "SELECT sum(key) FROM t")
|
||||
.first()
|
||||
.unwrap()
|
||||
.get(0);
|
||||
println!("sum = {}", count);
|
||||
assert_eq!(count, 5050);
|
||||
|
||||
//Push all node files to s3
|
||||
//TODO upload them directly to pageserver
|
||||
node.push_to_s3();
|
||||
//Upload data from s3 to pageserver
|
||||
//storage_cplane.upload_from_s3() //Not implemented yet
|
||||
|
||||
//Shut down the node
|
||||
node.stop();
|
||||
|
||||
//Create new node without files
|
||||
let node_restored = compute_cplane.new_node_wo_data();
|
||||
|
||||
// Setup minimal set of files needed to start node and setup pageserver connection
|
||||
// TODO 6947041219207877724 is a hardcoded sysid. Get it from node
|
||||
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
|
||||
|
||||
//Start compute node without files
|
||||
node_restored.start(&storage_cplane);
|
||||
|
||||
//Ensure that is has table created on initial node
|
||||
let rows = node_restored.safe_psql("postgres", "SELECT key from t;");
|
||||
assert_eq!(rows.len(), 5050);
|
||||
}
|
||||
|
||||
@@ -159,6 +159,7 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
|
||||
// Before opening up for connections, restore the latest base backup from S3.
|
||||
// (We don't persist anything to local disk at the moment, so we need to do
|
||||
// this at every startup)
|
||||
// TODO move it to a separate function
|
||||
if !conf.skip_recovery {
|
||||
restore_s3::restore_main(&conf);
|
||||
}
|
||||
|
||||
218
pageserver/src/controlfile.rs
Normal file
218
pageserver/src/controlfile.rs
Normal file
@@ -0,0 +1,218 @@
|
||||
#![allow(non_camel_case_types)]
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
use std::io::SeekFrom;
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
|
||||
use log::*;
|
||||
|
||||
type XLogRecPtr = u64;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone)]
|
||||
/*
|
||||
* Body of CheckPoint XLOG records. This is declared here because we keep
|
||||
* a copy of the latest one in pg_control for possible disaster recovery.
|
||||
* Changing this struct requires a PG_CONTROL_VERSION bump.
|
||||
*/
|
||||
pub struct CheckPoint {
|
||||
pub redo: XLogRecPtr, /* next RecPtr available when we began to
|
||||
* create CheckPoint (i.e. REDO start point) */
|
||||
pub ThisTimeLineID: u32, /* current TLI */
|
||||
pub PrevTimeLineID: u32, /* previous TLI, if this record begins a new
|
||||
* timeline (equals ThisTimeLineID otherwise) */
|
||||
pub fullPageWrites: bool, /* current full_page_writes */
|
||||
pub nextXid: u64, /* next free transaction ID */
|
||||
pub nextOid: u32, /* next free OID */
|
||||
pub nextMulti: u32, /* next free MultiXactId */
|
||||
pub nextMultiOffset: u32, /* next free MultiXact offset */
|
||||
pub oldestXid: u32, /* cluster-wide minimum datfrozenxid */
|
||||
pub oldestXidDB: u32, /* database with minimum datfrozenxid */
|
||||
pub oldestMulti: u32, /* cluster-wide minimum datminmxid */
|
||||
pub oldestMultiDB: u32, /* database with minimum datminmxid */
|
||||
pub time: u64, /* time stamp of checkpoint */
|
||||
pub oldestCommitTsXid: u32, /* oldest Xid with valid commit
|
||||
* timestamp */
|
||||
pub newestCommitTsXid: u32, /* newest Xid with valid commit
|
||||
* timestamp */
|
||||
|
||||
/*
|
||||
* Oldest XID still running. This is only needed to initialize hot standby
|
||||
* mode from an online checkpoint, so we only bother calculating this for
|
||||
* online checkpoints and only when wal_level is replica. Otherwise it's
|
||||
* set to InvalidTransactionId.
|
||||
*/
|
||||
pub oldestActiveXid: u32,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ControlFileDataZenith {
|
||||
pub system_identifier: u64,
|
||||
pg_control_version: u32, /* PG_CONTROL_VERSION */
|
||||
catalog_version_no: u32, /* see catversion.h */
|
||||
|
||||
state: i32, /* see enum above */
|
||||
time: i64, /* time stamp of last pg_control update */
|
||||
pub checkPoint: XLogRecPtr,
|
||||
checkPointCopy: CheckPoint, /* copy of last check point record */
|
||||
unloggedLSN: XLogRecPtr, /* current fake LSN value, for unlogged rels */
|
||||
minRecoveryPoint: XLogRecPtr,
|
||||
minRecoveryPointTLI: u32,
|
||||
backupStartPoint: XLogRecPtr,
|
||||
backupEndPoint: XLogRecPtr,
|
||||
backupEndRequired: bool,
|
||||
}
|
||||
|
||||
impl ControlFileDataZenith {
|
||||
pub fn new() -> ControlFileDataZenith {
|
||||
ControlFileDataZenith {
|
||||
system_identifier: 0,
|
||||
pg_control_version: 0,
|
||||
catalog_version_no: 0,
|
||||
state: 0,
|
||||
time: 0,
|
||||
checkPoint: 0,
|
||||
checkPointCopy: {
|
||||
CheckPoint {
|
||||
redo: 0,
|
||||
ThisTimeLineID: 0,
|
||||
PrevTimeLineID: 0,
|
||||
fullPageWrites: false,
|
||||
nextXid: 0,
|
||||
nextOid: 0,
|
||||
nextMulti: 0,
|
||||
nextMultiOffset: 0,
|
||||
oldestXid: 0,
|
||||
oldestXidDB: 0,
|
||||
oldestMulti: 0,
|
||||
oldestMultiDB: 0,
|
||||
time: 0,
|
||||
oldestCommitTsXid: 0,
|
||||
newestCommitTsXid: 0,
|
||||
oldestActiveXid: 0,
|
||||
}
|
||||
},
|
||||
unloggedLSN: 0,
|
||||
minRecoveryPoint: 0,
|
||||
minRecoveryPointTLI: 0,
|
||||
backupStartPoint: 0,
|
||||
backupEndPoint: 0,
|
||||
backupEndRequired: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode_pg_control(mut buf: Bytes) -> ControlFileDataZenith {
|
||||
info!("decode pg_control");
|
||||
|
||||
let controlfile: ControlFileDataZenith = ControlFileDataZenith {
|
||||
system_identifier: buf.get_u64_le(),
|
||||
pg_control_version: buf.get_u32_le(),
|
||||
catalog_version_no: buf.get_u32_le(),
|
||||
state: buf.get_i32_le(),
|
||||
time: {
|
||||
buf.advance(4);
|
||||
buf.get_i64_le()
|
||||
},
|
||||
checkPoint: buf.get_u64_le(),
|
||||
checkPointCopy: {
|
||||
CheckPoint {
|
||||
redo: buf.get_u64_le(),
|
||||
ThisTimeLineID: buf.get_u32_le(),
|
||||
PrevTimeLineID: buf.get_u32_le(),
|
||||
fullPageWrites: buf.get_u8() != 0,
|
||||
nextXid: {
|
||||
buf.advance(7);
|
||||
buf.get_u64_le()
|
||||
},
|
||||
nextOid: buf.get_u32_le(),
|
||||
nextMulti: buf.get_u32_le(),
|
||||
nextMultiOffset: buf.get_u32_le(),
|
||||
oldestXid: buf.get_u32_le(),
|
||||
oldestXidDB: buf.get_u32_le(),
|
||||
oldestMulti: buf.get_u32_le(),
|
||||
oldestMultiDB: buf.get_u32_le(),
|
||||
time: {
|
||||
buf.advance(4);
|
||||
buf.get_u64_le()
|
||||
},
|
||||
oldestCommitTsXid: buf.get_u32_le(),
|
||||
newestCommitTsXid: buf.get_u32_le(),
|
||||
oldestActiveXid: buf.get_u32_le(),
|
||||
}
|
||||
},
|
||||
unloggedLSN: buf.get_u64_le(),
|
||||
minRecoveryPoint: buf.get_u64_le(),
|
||||
minRecoveryPointTLI: buf.get_u32_le(),
|
||||
backupStartPoint: {
|
||||
buf.advance(4);
|
||||
buf.get_u64_le()
|
||||
},
|
||||
backupEndPoint: buf.get_u64_le(),
|
||||
backupEndRequired: buf.get_u8() != 0,
|
||||
};
|
||||
|
||||
return controlfile;
|
||||
}
|
||||
|
||||
pub fn parse_controlfile(b: Bytes) {
|
||||
let controlfile = decode_pg_control(b);
|
||||
|
||||
info!(
|
||||
"controlfile {:X}/{:X}",
|
||||
controlfile.checkPoint >> 32,
|
||||
controlfile.checkPoint
|
||||
);
|
||||
info!("controlfile {:?}", controlfile);
|
||||
}
|
||||
|
||||
const MAX_MAPPINGS: usize = 62;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RelMapping {
|
||||
mapoid: u32, /* OID of a catalog */
|
||||
mapfilenode: u32, /* its filenode number */
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RelMapFile {
|
||||
magic: i32, /* always RELMAPPER_FILEMAGIC */
|
||||
num_mappings: i32, /* number of valid RelMapping entries */
|
||||
mappings: [u8; MAX_MAPPINGS * 8],
|
||||
crc: u32, /* CRC of all above */
|
||||
pad: i32, /* to make the struct size be 512 exactly */
|
||||
}
|
||||
|
||||
pub fn decode_filemapping(mut buf: Bytes) -> RelMapFile {
|
||||
info!("decode filemap");
|
||||
|
||||
let file: RelMapFile = RelMapFile {
|
||||
magic: buf.get_i32_le(), /* always RELMAPPER_FILEMAGIC */
|
||||
num_mappings: buf.get_i32_le(), /* number of valid RelMapping entries */
|
||||
mappings: {
|
||||
let mut arr = [0 as u8; MAX_MAPPINGS * 8];
|
||||
buf.copy_to_slice(&mut arr);
|
||||
arr
|
||||
},
|
||||
crc: buf.get_u32_le(), /* CRC of all above */
|
||||
pad: buf.get_i32_le(),
|
||||
};
|
||||
|
||||
info!("decode filemap {:?}", file);
|
||||
file
|
||||
}
|
||||
|
||||
pub fn write_buf_to_file(filepath: String, buf: Bytes, blkno: u32) {
|
||||
info!("write_buf_to_file {}", filepath.clone());
|
||||
|
||||
let mut buffer = File::create(filepath.clone()).unwrap();
|
||||
buffer.seek(SeekFrom::Start(8192 * blkno as u64)).unwrap();
|
||||
|
||||
buffer.write_all(&buf).unwrap();
|
||||
|
||||
info!("DONE write_buf_to_file {}", filepath);
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub mod controlfile;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
#[allow(dead_code)]
|
||||
pub mod pg_constants;
|
||||
pub mod restore_s3;
|
||||
pub mod tui;
|
||||
pub mod tui_event;
|
||||
|
||||
@@ -8,20 +8,21 @@
|
||||
|
||||
use core::ops::Bound::Included;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::{convert::TryInto, ops::AddAssign};
|
||||
|
||||
use std::error::Error;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::{convert::TryInto, ops::AddAssign};
|
||||
// use tokio::sync::RwLock;
|
||||
use bytes::Bytes;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use rand::Rng;
|
||||
|
||||
use crate::{walredo, PageServerConf};
|
||||
use crate::{controlfile, walredo, PageServerConf};
|
||||
|
||||
use crossbeam_channel::unbounded;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
@@ -107,6 +108,8 @@ struct PageCacheShared {
|
||||
first_valid_lsn: u64,
|
||||
last_valid_lsn: u64,
|
||||
last_record_lsn: u64,
|
||||
|
||||
controldata: controlfile::ControlFileDataZenith,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -146,6 +149,7 @@ fn init_page_cache() -> PageCache {
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
controldata: controlfile::ControlFileDataZenith::new(),
|
||||
}),
|
||||
valid_lsn_condvar: Condvar::new(),
|
||||
|
||||
@@ -176,7 +180,7 @@ fn init_page_cache() -> PageCache {
|
||||
// stored directly in the cache entry in that you still need to run the WAL redo
|
||||
// routine to generate the page image.
|
||||
//
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
|
||||
pub struct CacheKey {
|
||||
pub tag: BufferTag,
|
||||
pub lsn: u64,
|
||||
@@ -215,7 +219,7 @@ impl CacheEntry {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
|
||||
#[derive(Eq, PartialEq, Hash, Clone, Copy, Debug)]
|
||||
pub struct RelTag {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
@@ -223,7 +227,7 @@ pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||
pub struct BufferTag {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
@@ -242,25 +246,145 @@ pub struct WALRecord {
|
||||
// Public interface functions
|
||||
|
||||
impl PageCache {
|
||||
pub fn get_nonrel_page(&self, tag: BufferTag, _reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Now we don't have versioning for non-rel pages.
|
||||
// Also at bootstrap we don't know lsn for some files.
|
||||
// So always request the very latest version
|
||||
// let lsn = reqlsn;
|
||||
|
||||
let lsn = u64::MAX;
|
||||
|
||||
let minkey = CacheKey { tag: tag, lsn: 0 };
|
||||
// Look up to the largest lsn
|
||||
let maxkey = CacheKey { tag: tag, lsn: lsn };
|
||||
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let shared = self.shared.lock().unwrap();
|
||||
|
||||
let pagecache = &shared.pagecache;
|
||||
info!("got pagecache {}", pagecache.len());
|
||||
|
||||
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
|
||||
|
||||
let entry_opt = entries.next_back();
|
||||
|
||||
if entry_opt.is_none() {
|
||||
return Err(format!(
|
||||
"not found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
|
||||
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||
))?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
|
||||
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||
);
|
||||
|
||||
let (_key, entry) = entry_opt.unwrap();
|
||||
entry_rc = entry.clone();
|
||||
|
||||
// Now that we have a reference to the cache entry, drop the lock on the map.
|
||||
// It's important to do this before waiting on the condition variable below,
|
||||
// and better to do it as soon as possible to maximize concurrency.
|
||||
}
|
||||
|
||||
// Lock the cache entry and dig the page image out of it.
|
||||
let page_img: Bytes;
|
||||
{
|
||||
let entry_content = entry_rc.content.lock().unwrap();
|
||||
|
||||
if let Some(img) = &entry_content.page_image {
|
||||
assert!(!entry_content.apply_pending);
|
||||
page_img = img.clone();
|
||||
} else if entry_content.wal_record.is_some() {
|
||||
return Err("non-rel WAL redo is not implemented yet".into());
|
||||
//
|
||||
// If this page needs to be reconstructed by applying some WAL,
|
||||
// send a request to the WAL redo thread.
|
||||
//
|
||||
// if !entry_content.apply_pending {
|
||||
// assert!(!entry_content.apply_pending);
|
||||
// entry_content.apply_pending = true;
|
||||
|
||||
// let s = &self.walredo_sender;
|
||||
// s.send(entry_rc.clone())?;
|
||||
// }
|
||||
|
||||
// while entry_content.apply_pending {
|
||||
// entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
|
||||
//}
|
||||
|
||||
// We should now have a page image. If we don't, it means that WAL redo
|
||||
// failed to reconstruct it. WAL redo should've logged that error already.
|
||||
// page_img = match &entry_content.page_image {
|
||||
// Some(p) => p.clone(),
|
||||
// None => {
|
||||
// error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
|
||||
// return Err("could not apply WAL to reconstruct page image".into());
|
||||
// }
|
||||
// };
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
return Err(format!("no page image or WAL record for requested page"))?;
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"Returning page for {}/{}/{}.{} blk {}",
|
||||
tag.spcnode,
|
||||
tag.dbnode,
|
||||
tag.relnode,
|
||||
tag.forknum,
|
||||
tag.blknum
|
||||
);
|
||||
|
||||
return Ok(page_img);
|
||||
}
|
||||
|
||||
//
|
||||
// GetPage@LSN
|
||||
//
|
||||
// Returns an 8k page image
|
||||
//
|
||||
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
||||
pub fn get_page_at_lsn(&self, tag: BufferTag, reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
|
||||
let mut lsn = reqlsn;
|
||||
|
||||
if tag.forknum > 40 {
|
||||
info!(
|
||||
"get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}",
|
||||
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
|
||||
);
|
||||
|
||||
return self.get_nonrel_page(tag, lsn);
|
||||
}
|
||||
|
||||
if reqlsn == 0 {
|
||||
let c = self.get_controldata();
|
||||
lsn = c.checkPoint;
|
||||
|
||||
info!("update reqlsn get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn,
|
||||
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum);
|
||||
}
|
||||
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let minkey = CacheKey { tag: tag, lsn: 0 };
|
||||
let maxkey = CacheKey { tag: tag, lsn: lsn };
|
||||
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
|
||||
let mut waited = false;
|
||||
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// When server just started and created checkpoint lsn,
|
||||
// but we have not yet established connection,
|
||||
// requested lsn will be larger than the one we have
|
||||
while lsn > shared.last_valid_lsn + 500 {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
trace!(
|
||||
@@ -276,8 +400,8 @@ impl PageCache {
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
return Err(format!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32, lsn & 0xffff_ffff
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
))?;
|
||||
}
|
||||
}
|
||||
@@ -286,8 +410,7 @@ impl PageCache {
|
||||
}
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
return Err(format!("LSN {:X}/{:X} has already been removed",
|
||||
lsn >> 32, lsn & 0xffff_ffff))?;
|
||||
return Err(format!("LSN {} has already been removed", lsn))?;
|
||||
}
|
||||
|
||||
let pagecache = &shared.pagecache;
|
||||
@@ -297,9 +420,9 @@ impl PageCache {
|
||||
let entry_opt = entries.next_back();
|
||||
|
||||
if entry_opt.is_none() {
|
||||
static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192];
|
||||
return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||
/* return Err("could not find page image")?; */
|
||||
//static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192];
|
||||
//return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||
return Err("could not find page image")?;
|
||||
}
|
||||
let (_key, entry) = entry_opt.unwrap();
|
||||
entry_rc = entry.clone();
|
||||
@@ -476,8 +599,10 @@ impl PageCache {
|
||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||
assert!(oldentry.is_none());
|
||||
|
||||
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
|
||||
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
|
||||
debug!(
|
||||
"inserted page image for {}/{}/{}_{} blk {} at {}",
|
||||
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn
|
||||
);
|
||||
|
||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
@@ -550,6 +675,16 @@ impl PageCache {
|
||||
return shared.last_record_lsn;
|
||||
}
|
||||
|
||||
pub fn set_controldata(&self, c: controlfile::ControlFileDataZenith) {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
shared.controldata = c;
|
||||
}
|
||||
|
||||
pub fn get_controldata(&self) -> controlfile::ControlFileDataZenith {
|
||||
let shared = self.shared.lock().unwrap();
|
||||
return shared.controldata.clone();
|
||||
}
|
||||
|
||||
//
|
||||
// Simple test function for the WAL redo code:
|
||||
//
|
||||
@@ -610,6 +745,7 @@ impl PageCache {
|
||||
*entry = to + 1;
|
||||
}
|
||||
}
|
||||
trace!("relsize_inc {:?} to {}", rel, entry);
|
||||
}
|
||||
|
||||
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
|
||||
|
||||
@@ -24,6 +24,8 @@ use crate::page_cache;
|
||||
use crate::walreceiver;
|
||||
use crate::PageServerConf;
|
||||
|
||||
use crate::controlfile;
|
||||
|
||||
type Result<T> = std::result::Result<T, io::Error>;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -51,7 +53,6 @@ enum BeMessage {
|
||||
RowDescription,
|
||||
DataRow,
|
||||
CommandComplete,
|
||||
ControlFile,
|
||||
|
||||
//
|
||||
// All that messages are actually CopyData from libpq point of view.
|
||||
@@ -339,18 +340,6 @@ impl Connection {
|
||||
self.stream.write_buf(&mut b).await?;
|
||||
}
|
||||
|
||||
BeMessage::ControlFile => {
|
||||
// TODO pass checkpoint and xid info in this message
|
||||
let mut b = Bytes::from("hello pg_control");
|
||||
|
||||
self.stream.write_u8(b'D').await?;
|
||||
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
|
||||
|
||||
self.stream.write_i16(1).await?;
|
||||
self.stream.write_i32(b.len() as i32).await?;
|
||||
self.stream.write_buf(&mut b).await?;
|
||||
}
|
||||
|
||||
BeMessage::CommandComplete => {
|
||||
let mut b = Bytes::from("SELECT 1\0");
|
||||
|
||||
@@ -438,8 +427,54 @@ impl Connection {
|
||||
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<()> {
|
||||
trace!("got query {:?}", q.body);
|
||||
|
||||
if q.body.starts_with(b"controlfile") {
|
||||
self.handle_controlfile().await
|
||||
if q.body.starts_with(b"file") {
|
||||
let (_l, r) = q.body.split_at("file ".len());
|
||||
//TODO parse it correctly
|
||||
let r = r.to_vec();
|
||||
let str = String::from_utf8(r).unwrap().to_string();
|
||||
|
||||
let mut split = str.split(',');
|
||||
let mut s;
|
||||
|
||||
let filepath = split.next().unwrap();
|
||||
let sysid = {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u64>().unwrap()
|
||||
};
|
||||
|
||||
let buf_tag = page_cache::BufferTag {
|
||||
spcnode: {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u32>().unwrap()
|
||||
},
|
||||
dbnode: {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u32>().unwrap()
|
||||
},
|
||||
relnode: {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u32>().unwrap()
|
||||
},
|
||||
forknum: {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u8>().unwrap()
|
||||
},
|
||||
blknum: {
|
||||
s = split.next().unwrap();
|
||||
s.parse::<u32>().unwrap()
|
||||
},
|
||||
};
|
||||
|
||||
//TODO PARSE LSN
|
||||
//let lsn = { s = split.next().unwrap(); s.parse::<u64>().unwrap()};
|
||||
let lsn: u64 = 0;
|
||||
info!(
|
||||
"process file query sysid {} -- {:?} lsn {}",
|
||||
sysid, buf_tag, lsn
|
||||
);
|
||||
|
||||
self.handle_file(filepath.to_string(), sysid, buf_tag, lsn.into())
|
||||
.await
|
||||
} else if q.body.starts_with(b"pagestream ") {
|
||||
let (_l, r) = q.body.split_at("pagestream ".len());
|
||||
let mut r = r.to_vec();
|
||||
@@ -486,10 +521,29 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_controlfile(&mut self) -> Result<()> {
|
||||
async fn handle_file(
|
||||
&mut self,
|
||||
filepath: String,
|
||||
sysid: u64,
|
||||
buf_tag: page_cache::BufferTag,
|
||||
lsn: u64,
|
||||
) -> Result<()> {
|
||||
let pcache = page_cache::get_pagecache(self.conf.clone(), sysid);
|
||||
|
||||
match pcache.get_page_at_lsn(buf_tag, lsn) {
|
||||
Ok(p) => {
|
||||
info!("info succeeded get_page_at_lsn: {}", lsn);
|
||||
|
||||
controlfile::write_buf_to_file(filepath, p, buf_tag.blknum);
|
||||
}
|
||||
Err(e) => {
|
||||
info!("page not found and it's ok. get_page_at_lsn: {}", e);
|
||||
}
|
||||
};
|
||||
|
||||
self.write_message_noflush(&BeMessage::RowDescription)
|
||||
.await?;
|
||||
self.write_message_noflush(&BeMessage::ControlFile).await?;
|
||||
self.write_message_noflush(&BeMessage::DataRow).await?;
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)
|
||||
.await?;
|
||||
self.write_message(&BeMessage::ReadyForQuery).await
|
||||
@@ -558,6 +612,7 @@ impl Connection {
|
||||
|
||||
let n_blocks = pcache.relsize_get(&tag);
|
||||
|
||||
trace!("ZenithNblocksRequest {:?} = {}", tag, n_blocks);
|
||||
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: n_blocks,
|
||||
@@ -574,11 +629,26 @@ impl Connection {
|
||||
};
|
||||
|
||||
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
|
||||
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
page: p,
|
||||
}),
|
||||
Ok(p) => {
|
||||
let mut b = BytesMut::with_capacity(8192);
|
||||
|
||||
trace!("ZenithReadResponse get_page_at_lsn succeed");
|
||||
if p.len() < 8192 {
|
||||
//add padding
|
||||
trace!("ZenithReadResponse add padding");
|
||||
let padding: [u8; 8192 - 512] = [0; 8192 - 512];
|
||||
b.extend_from_slice(&p);
|
||||
b.extend_from_slice(&padding);
|
||||
} else {
|
||||
b.extend_from_slice(&p);
|
||||
}
|
||||
|
||||
BeMessage::ZenithReadResponse(ZenithReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
page: b.freeze(),
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
||||
error!("get_page_at_lsn: {}", e);
|
||||
@@ -599,6 +669,7 @@ impl Connection {
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
trace!("ZenithCreateRequest {:?}", tag);
|
||||
|
||||
pcache.relsize_inc(&tag, None);
|
||||
|
||||
@@ -616,6 +687,8 @@ impl Connection {
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
trace!("ZenithExtendRequest {:?} to {}", tag, req.blkno);
|
||||
|
||||
pcache.relsize_inc(&tag, Some(req.blkno));
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
|
||||
11
pageserver/src/pg_constants.rs
Normal file
11
pageserver/src/pg_constants.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
// From pg_tablespace_d.h
|
||||
//
|
||||
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
//Special values for non-rel files' tags
|
||||
//TODO maybe use enum?
|
||||
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
|
||||
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
|
||||
pub const PG_XACT_FORKNUM: u32 = 44;
|
||||
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
|
||||
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
|
||||
@@ -22,7 +22,7 @@ use tokio::runtime;
|
||||
|
||||
use futures::future;
|
||||
|
||||
use crate::{page_cache, PageServerConf};
|
||||
use crate::{controlfile, page_cache, pg_constants, PageServerConf};
|
||||
|
||||
struct Storage {
|
||||
region: Region,
|
||||
@@ -84,8 +84,24 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
||||
.list("relationdata/".to_string(), Some("".to_string()))
|
||||
.await?;
|
||||
|
||||
// TODO: get that from backup
|
||||
let sys_id: u64 = 42;
|
||||
//Before uploading other files, slurp pg_control to set systemid
|
||||
|
||||
let control_results: Vec<s3::serde_types::ListBucketResult> = bucket
|
||||
.list(
|
||||
"relationdata/global/pg_control".to_string(),
|
||||
Some("".to_string()),
|
||||
)
|
||||
.await?;
|
||||
let object = &(&control_results[0]).contents[0];
|
||||
let (data, _) = bucket.get_object(&object.key).await.unwrap();
|
||||
let bytes = BytesMut::from(data.as_slice()).freeze();
|
||||
let c = controlfile::decode_pg_control(bytes);
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), c.system_identifier);
|
||||
pcache.set_controldata(c.clone());
|
||||
trace!("uploaded controlfile {:?}", pcache.get_controldata());
|
||||
|
||||
let sys_id: u64 = c.system_identifier;
|
||||
let mut oldest_lsn = 0;
|
||||
let mut slurp_futures: Vec<_> = Vec::new();
|
||||
|
||||
@@ -119,23 +135,47 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
||||
panic!("no base backup found");
|
||||
}
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
//Now add nonrelation files
|
||||
let nonrelresults: Vec<s3::serde_types::ListBucketResult> = bucket
|
||||
.list("nonreldata/".to_string(), Some("".to_string()))
|
||||
.await?;
|
||||
for result in nonrelresults {
|
||||
for object in result.contents {
|
||||
// Download needed non relation files, slurping them into memory
|
||||
|
||||
let key = object.key;
|
||||
let relpath = key.strip_prefix("nonreldata/").unwrap();
|
||||
trace!("list nonrelfiles {}", relpath);
|
||||
|
||||
let parsed = parse_nonrel_file_path(&relpath);
|
||||
|
||||
match parsed {
|
||||
Ok(p) => {
|
||||
let b = bucket.clone();
|
||||
let f = slurp_base_file(conf, sys_id, b, key.to_string(), p);
|
||||
|
||||
slurp_futures.push(f);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("unrecognized file: {} ({})", relpath, e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pcache.init_valid_lsn(oldest_lsn);
|
||||
|
||||
info!("{} files to restore...", slurp_futures.len());
|
||||
|
||||
future::join_all(slurp_futures).await;
|
||||
info!("restored!");
|
||||
info!(
|
||||
"restored! {:?} to {:?}",
|
||||
pcache.first_valid_lsn, pcache.last_valid_lsn
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// From pg_tablespace_d.h
|
||||
//
|
||||
// FIXME: we'll probably need these elsewhere too, move to some common location
|
||||
const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FilePathError {
|
||||
msg: String,
|
||||
@@ -185,6 +225,17 @@ struct ParsedBaseImageFileName {
|
||||
pub lsn: u64,
|
||||
}
|
||||
|
||||
fn parse_lsn_from_filename(fname: &str) -> Result<u64, FilePathError> {
|
||||
let (_, lsn_str) = fname.split_at(fname.len() - 16);
|
||||
|
||||
let (lsnhi, lsnlo) = lsn_str.split_at(8);
|
||||
let lsn_hi = u64::from_str_radix(lsnhi, 16)?;
|
||||
let lsn_lo = u64::from_str_radix(lsnlo, 16)?;
|
||||
let lsn = lsn_hi << 32 | lsn_lo;
|
||||
|
||||
return Ok(lsn);
|
||||
}
|
||||
|
||||
// formats:
|
||||
// <oid>
|
||||
// <oid>_<fork name>
|
||||
@@ -223,6 +274,46 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
||||
return Ok((relnode, forknum, segno, lsn));
|
||||
}
|
||||
|
||||
fn parse_nonrel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
||||
//TODO parse segno from xact filenames too
|
||||
if let Some(fname) = path.strip_prefix("pg_xact/") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_XACT_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
} else if let Some(fname) = path.strip_prefix("pg_multixact/offsets") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
} else if let Some(fname) = path.strip_prefix("pg_multixact/members") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
} else {
|
||||
return Err(FilePathError::new("invalid non relation data file name"));
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
|
||||
/*
|
||||
* Relation data files can be in one of the following directories:
|
||||
@@ -242,10 +333,36 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
||||
* <oid>.<segment number>
|
||||
*/
|
||||
if let Some(fname) = path.strip_prefix("global/") {
|
||||
if fname.contains("pg_control") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_CONTROLFILE_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
}
|
||||
|
||||
if fname.contains("pg_filenode") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
}
|
||||
|
||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: GLOBALTABLESPACE_OID,
|
||||
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||
dbnode: 0,
|
||||
relnode,
|
||||
forknum,
|
||||
@@ -265,10 +382,23 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
|
||||
return Err(FilePathError::new("invalid relation data file name"));
|
||||
};
|
||||
|
||||
if fname.contains("pg_filenode") {
|
||||
let lsn = parse_lsn_from_filename(fname.clone())?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dbnode,
|
||||
relnode: 0,
|
||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
segno: 0,
|
||||
lsn,
|
||||
});
|
||||
}
|
||||
|
||||
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
|
||||
|
||||
return Ok(ParsedBaseImageFileName {
|
||||
spcnode: DEFAULTTABLESPACE_OID,
|
||||
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dbnode,
|
||||
relnode,
|
||||
forknum,
|
||||
@@ -302,22 +432,55 @@ async fn slurp_base_file(
|
||||
|
||||
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
||||
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
|
||||
|
||||
while bytes.remaining() >= 8192 {
|
||||
let tag = page_cache::BufferTag {
|
||||
// pg_filenode.map has non-standard size - 512 bytes
|
||||
if parsed.forknum == pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||
let b = bytes.clone();
|
||||
controlfile::decode_filemapping(b);
|
||||
while bytes.remaining() >= 512 {
|
||||
let tag = page_cache::BufferTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
blknum: 0,
|
||||
};
|
||||
|
||||
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(512));
|
||||
}
|
||||
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
blknum: blknum,
|
||||
};
|
||||
|
||||
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
|
||||
pcache.relsize_inc(&tag, Some(0));
|
||||
} else {
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||
let reltag = page_cache::RelTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
};
|
||||
|
||||
blknum += 1;
|
||||
while bytes.remaining() >= 8192 {
|
||||
let tag = page_cache::BufferTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
blknum: blknum,
|
||||
};
|
||||
|
||||
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
|
||||
pcache.relsize_inc(&reltag, Some(blknum));
|
||||
|
||||
blknum += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
//#![allow(dead_code)]
|
||||
//include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
|
||||
|
||||
use crate::pg_constants;
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use std::cmp::min;
|
||||
@@ -234,6 +236,8 @@ const BLCKSZ: u16 = 8192;
|
||||
//
|
||||
// Constants from xlogrecord.h
|
||||
//
|
||||
const XLR_INFO_MASK: u8 = 0x0F;
|
||||
|
||||
const XLR_MAX_BLOCK_ID: u8 = 32;
|
||||
|
||||
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
|
||||
@@ -253,6 +257,12 @@ const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
|
||||
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
|
||||
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
|
||||
|
||||
//
|
||||
// constants from clog.h
|
||||
//
|
||||
const CLOG_XACTS_PER_BYTE: u32 = 4;
|
||||
const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
|
||||
|
||||
pub struct DecodedBkpBlock {
|
||||
/* Is this block ref in use? */
|
||||
//in_use: bool,
|
||||
@@ -261,7 +271,7 @@ pub struct DecodedBkpBlock {
|
||||
pub rnode_spcnode: u32,
|
||||
pub rnode_dbnode: u32,
|
||||
pub rnode_relnode: u32,
|
||||
pub forknum: u8,
|
||||
pub forknum: u8, // Note that we have a few special forknum values for non-rel files. Handle them too
|
||||
pub blkno: u32,
|
||||
|
||||
/* copy of the fork_flags field from the XLogRecordBlockHeader */
|
||||
@@ -297,6 +307,26 @@ pub struct DecodedWALRecord {
|
||||
const XLOG_SWITCH: u8 = 0x40;
|
||||
const RM_XLOG_ID: u8 = 0;
|
||||
|
||||
const RM_XACT_ID: u8 = 1;
|
||||
// const RM_CLOG_ID:u8 = 3;
|
||||
//const RM_MULTIXACT_ID:u8 = 6;
|
||||
|
||||
// from xact.h
|
||||
const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||
// const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
// const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
// const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
// const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
|
||||
// const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
|
||||
/* free opcode 0x70 */
|
||||
|
||||
/* mask for filtering opcodes out of xl_info */
|
||||
const XLOG_XACT_OPMASK: u8 = 0x70;
|
||||
|
||||
/* does this record have a 'xinfo' field or not */
|
||||
// const XLOG_XACT_HAS_INFO: u8 = 0x80;
|
||||
|
||||
// Is this record an XLOG_SWITCH record? They need some special processing,
|
||||
// so we need to check for that before the rest of the parsing.
|
||||
//
|
||||
@@ -331,13 +361,17 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
||||
|
||||
// FIXME: assume little-endian here
|
||||
let xl_tot_len = buf.get_u32_le();
|
||||
let _xl_xid = buf.get_u32_le();
|
||||
let xl_xid = buf.get_u32_le();
|
||||
let _xl_prev = buf.get_u64_le();
|
||||
let _xl_info = buf.get_u8();
|
||||
let _xl_rmid = buf.get_u8();
|
||||
let xl_info = buf.get_u8();
|
||||
let xl_rmid = buf.get_u8();
|
||||
buf.advance(2); // 2 bytes of padding
|
||||
let _xl_crc = buf.get_u32_le();
|
||||
|
||||
info!("decode_wal_record xl_rmid = {}", xl_rmid);
|
||||
|
||||
let rminfo: u8 = xl_info & !XLR_INFO_MASK;
|
||||
|
||||
let remaining = xl_tot_len - SizeOfXLogRecord;
|
||||
|
||||
if buf.remaining() != remaining as usize {
|
||||
@@ -349,6 +383,53 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
||||
let mut rnode_relnode: u32 = 0;
|
||||
let mut got_rnode = false;
|
||||
|
||||
if xl_rmid == RM_XACT_ID
|
||||
&& ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT
|
||||
|| (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED)
|
||||
{
|
||||
info!("decode_wal_record RM_XACT_ID - XLOG_XACT_COMMIT");
|
||||
|
||||
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
|
||||
|
||||
let blkno = xl_xid / CLOG_XACTS_PER_PAGE;
|
||||
|
||||
let mut blk = DecodedBkpBlock {
|
||||
rnode_spcnode: 0,
|
||||
rnode_dbnode: 0,
|
||||
rnode_relnode: 0,
|
||||
forknum: pg_constants::PG_XACT_FORKNUM as u8,
|
||||
blkno: blkno,
|
||||
|
||||
flags: 0,
|
||||
has_image: false,
|
||||
apply_image: false,
|
||||
will_init: false,
|
||||
hole_offset: 0,
|
||||
hole_length: 0,
|
||||
bimg_len: 0,
|
||||
bimg_info: 0,
|
||||
|
||||
has_data: true,
|
||||
data_len: 0,
|
||||
};
|
||||
|
||||
let fork_flags = buf.get_u8();
|
||||
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
|
||||
blk.data_len = buf.get_u16_le();
|
||||
|
||||
info!(
|
||||
"decode_wal_record RM_XACT_ID blk has data with data_len {}",
|
||||
blk.data_len
|
||||
);
|
||||
|
||||
blocks.push(blk);
|
||||
return DecodedWALRecord {
|
||||
lsn: lsn,
|
||||
record: rec,
|
||||
blocks: blocks,
|
||||
};
|
||||
}
|
||||
|
||||
// Decode the headers
|
||||
|
||||
let mut max_block_id = 0;
|
||||
@@ -552,6 +633,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
|
||||
|
||||
//blk->rnode = *rnode;
|
||||
}
|
||||
|
||||
blk.rnode_spcnode = rnode_spcnode;
|
||||
blk.rnode_dbnode = rnode_dbnode;
|
||||
blk.rnode_relnode = rnode_relnode;
|
||||
|
||||
@@ -160,9 +160,11 @@ impl WalRedoProcess {
|
||||
.expect("failed to execute initdb");
|
||||
|
||||
if !initdb.status.success() {
|
||||
panic!("initdb failed: {}\nstderr:\n{}",
|
||||
std::str::from_utf8(&initdb.stdout).unwrap(),
|
||||
std::str::from_utf8(&initdb.stderr).unwrap());
|
||||
panic!(
|
||||
"initdb failed: {}\nstderr:\n{}",
|
||||
std::str::from_utf8(&initdb.stdout).unwrap(),
|
||||
std::str::from_utf8(&initdb.stderr).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// Start postgres itself
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: b1f5a5ec14...a71b5c24eb
Reference in New Issue
Block a user