Compare commits

...

3 Commits

Author SHA1 Message Date
anastasia
a267dfa41f code cleanup for compute_node_rebase branch 2021-04-09 17:25:41 +03:00
anastasia
1b9eb9430c 1. Handle SLRU and nonrel files as pageserver pages: upload them via restore_s3, handle in protocol.
2. Parse pg_control to retrieve systemid, lsn and so on. Store it in pagecache.
3. Setup compute node without files: only request a few essential files from pageserver to bootstrap. And after that route ALL I/O requests to pageserver.
Use initdb --compute-node flag to create such minimal node without files. And GUC 'computenode_mode=true'to request all pages from pageserver
2021-04-08 16:01:24 +03:00
anastasia
9a4fbf365c add test test_pageserver_recovery.
zenith_push postgres to minio and start pageserver using this base image
2021-04-08 15:14:44 +03:00
12 changed files with 1064 additions and 94 deletions

View File

@@ -21,6 +21,8 @@ use std::{
use lazy_static::lazy_static;
use postgres::{Client, NoTls};
use postgres;
lazy_static! {
// postgres would be there if it was build by 'make postgres' here in the repo
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
@@ -57,7 +59,7 @@ pub struct StorageControlPlane {
impl StorageControlPlane {
// postgres <-> page_server
pub fn one_page_server() -> StorageControlPlane {
pub fn one_page_server(froms3: bool) -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
@@ -68,7 +70,11 @@ impl StorageControlPlane {
data_dir: TEST_WORKDIR.join("pageserver"),
};
pserver.init();
pserver.start();
if froms3 {
pserver.start_froms3();
} else {
pserver.start();
}
cplane.page_servers.push(pserver);
cplane
@@ -186,6 +192,27 @@ impl PageServerNode {
}
}
pub fn start_froms3(&self) {
println!("Starting pageserver at '{}'", self.page_service_addr);
let status = Command::new(BIN_DIR.join("pageserver"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d")
.env_clear()
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
.env("S3_REGION", "us-east-1")
.env("S3_ACCESSKEY", "minioadmin")
.env("S3_SECRET", "minioadmin")
.status()
.expect("failed to start pageserver");
if !status.success() {
panic!("pageserver start failed");
}
}
pub fn stop(&self) {
let pidfile = self.data_dir.join("pageserver.pid");
let pid = fs::read_to_string(pidfile).unwrap();
@@ -352,7 +379,7 @@ impl ComputeControlPlane<'_> {
// Init compute node without files, only datadir structure
// use initdb --compute-node flag and GUC 'computenode_mode'
// to distinguish the node
pub fn new_minimal_node(&mut self) -> &PostgresNode {
pub fn new_minimal_node<'a>(&mut self) -> &Arc<PostgresNode> {
// allocate new node entry with generated port
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
@@ -365,7 +392,7 @@ impl ComputeControlPlane<'_> {
self.nodes.push(Arc::new(node));
let node = self.nodes.last().unwrap();
// initialize data directory w/o files
// initialize data directory
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
let initdb_path = self.pg_bin_dir.join("initdb");
println!("initdb_path: {}", initdb_path.to_str().unwrap());
@@ -383,6 +410,11 @@ impl ComputeControlPlane<'_> {
panic!("initdb failed");
}
// // allow local replication connections
// node.append_conf("pg_hba.conf", format!("\
// host replication all {}/32 sspi include_realm=1 map=regress\n\
// ", node.ip).as_str());
// listen for selected port
node.append_conf(
"postgresql.conf",
@@ -397,16 +429,37 @@ impl ComputeControlPlane<'_> {
listen_addresses = '{address}'\n\
port = {port}\n\
computenode_mode = true\n\
",
",
address = node.ip,
port = node.port
)
.as_str(),
);
node
}
pub fn new_node_wo_data(&mut self) -> Arc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let node = self.new_minimal_node();
let pserver = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf(
"postgresql.conf",
format!(
"\
page_server_connstring = 'host={} port={}'\n\
",
pserver.ip(),
pserver.port()
)
.as_str(),
);
node.clone()
}
pub fn new_node(&mut self) -> Arc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let node = self.new_vanilla_node();
@@ -575,27 +628,133 @@ impl PostgresNode {
self.pgdata.to_str()
}
/* Create stub controlfile and respective xlog to start computenode */
pub fn setup_controlfile(&self) {
let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap());
// Request from pageserver stub controlfile, respective xlog
// and a bunch of files needed to start computenode
//
// NOTE this "file" request is a crutch.
// It asks pageserver to write requested page to the provided filepath
// and thus only works locally.
// TODO receive pages via some libpq protocol.
// The problem I've met is that nonrelfiles are not valid utf8 and cannot be
// handled by simple_query(). that expects test.
// And reqular query() uses prepared queries.
{
File::create(filepath).unwrap();
// TODO pass sysid as parameter
pub fn setup_compute_node(&self, sysid: u64, storage_cplane: &StorageControlPlane) {
let mut query;
//Request pg_control from pageserver
query = format!(
"file {}/global/pg_control,{},{},{},{},{},{},{}",
self.pgdata.to_str().unwrap(),
sysid as u64, //sysid
1664, //tablespace
0, //dboid
0, //reloid
42, //forknum pg_control
0, //blkno
0 //lsn
);
storage_cplane.page_server_psql(query.as_str());
//Request pg_xact and pg_multixact from pageserver
//We need them for initial pageserver startup and authentication
//TODO figure out which block number we really need
query = format!(
"file {}/pg_xact/0000,{},{},{},{},{},{},{}",
self.pgdata.to_str().unwrap(),
sysid as u64, //sysid
0, //tablespace
0, //dboid
0, //reloid
44, //forknum
0, //blkno
0 //lsn
);
storage_cplane.page_server_psql(query.as_str());
query = format!(
"file {}/pg_multixact/offsets/0000,{},{},{},{},{},{},{}",
self.pgdata.to_str().unwrap(),
sysid as u64, //sysid
0, //tablespace
0, //dboid
0, //reloid
45, //forknum
0, //blkno
0 //lsn
);
storage_cplane.page_server_psql(query.as_str());
query = format!(
"file {}/pg_multixact/members/0000,{},{},{},{},{},{},{}",
self.pgdata.to_str().unwrap(),
sysid as u64, //sysid
0, //tablespace
0, //dboid
0, //reloid
46, //forknum
0, //blkno
0 //lsn
);
storage_cplane.page_server_psql(query.as_str());
//Request a few shared catalogs needed for authentication
//Without them we cannot setup connection with pageserver to request further pages
let reloids = [1260, 1261, 1262, 2396];
for reloid in reloids.iter() {
//FIXME request all blocks from file, not just 10
for blkno in 0..10 {
query = format!(
"file {}/global/{},{},{},{},{},{},{},{}",
self.pgdata.to_str().unwrap(),
reloid, //suse it as filename
sysid as u64, //sysid
1664, //tablespace
0, //dboid
reloid, //reloid
0, //forknum
blkno, //blkno
0 //lsn
);
storage_cplane.page_server_psql(query.as_str());
}
}
fs::create_dir(format!("{}/base/13006", self.pgdata.to_str().unwrap())).unwrap();
fs::create_dir(format!("{}/base/13007", self.pgdata.to_str().unwrap())).unwrap();
//FIXME figure out what wal file we need to successfully start
let walfilepath = format!(
"{}/pg_wal/000000010000000000000001",
self.pgdata.to_str().unwrap()
);
fs::copy(
"/home/anastasia/zenith/zenith/tmp_check/pgdata/pg_wal/000000010000000000000001",
walfilepath,
)
.unwrap();
println!("before resetwal ");
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
// Now it does nothing, just prints existing content of pg_control.
// TODO update values with most recent lsn, xid, oid requested from pageserver
let pg_resetwal = Command::new(pg_resetwal_path)
.args(&["-D", self.pgdata.to_str().unwrap()])
.arg("-f")
// TODO probably we will have to modify pg_resetwal
// .arg("--compute-node")
.arg("-n") //dry run
//.arg("-f")
//.args(&["--next-transaction-id", "100500"])
//.args(&["--next-oid", "17000"])
//.args(&["--next-transaction-id", "100500"])
.status()
.expect("failed to execute pg_resetwal");
if !pg_resetwal.success() {
panic!("pg_resetwal failed");
}
println!("setup done");
}
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
@@ -613,6 +772,28 @@ impl PostgresNode {
}
}
pub fn push_to_s3(&self) {
println!("Push to s3 node at '{}'", self.pgdata.to_str().unwrap());
let zenith_push_path = self.pg_bin_dir.join("zenith_push");
println!("zenith_push_path: {}", zenith_push_path.to_str().unwrap());
let status = Command::new(zenith_push_path)
.args(&["-D", self.pgdata.to_str().unwrap()])
.env_clear()
.env("S3_ENDPOINT", "https://127.0.0.1:9000")
.env("S3_REGION", "us-east-1")
.env("S3_ACCESSKEY", "minioadmin")
.env("S3_SECRET", "minioadmin")
// .env("S3_BUCKET", "zenith-testbucket")
.status()
.expect("failed to push node to s3");
if !status.success() {
panic!("zenith_push failed");
}
}
// TODO
pub fn pg_bench() {}
pub fn pg_regress() {}

View File

@@ -1,5 +1,7 @@
#[allow(dead_code)]
mod control_plane;
use std::thread::sleep;
use std::time::Duration;
use control_plane::ComputeControlPlane;
use control_plane::StorageControlPlane;
@@ -8,17 +10,21 @@ use control_plane::StorageControlPlane;
// -- restart + seqscan won't read deleted stuff
// -- pageserver api endpoint to check all rels
// Handcrafted cases with wal records that are (were) problematic for redo.
//Handcrafted cases with wal records that are (were) problematic for redo.
#[test]
#[ignore]
fn test_redo_cases() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let storage_cplane = StorageControlPlane::one_page_server(false);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
// check basic work with table
node.safe_psql(
"postgres",
@@ -26,7 +32,7 @@ fn test_redo_cases() {
);
node.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
);
let count: i64 = node
.safe_psql("postgres", "SELECT sum(key) FROM t")
@@ -34,9 +40,9 @@ fn test_redo_cases() {
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
assert_eq!(count, 5050);
// check 'create table as'
//check 'create table as'
node.safe_psql("postgres", "CREATE TABLE t2 AS SELECT * FROM t");
let count: i64 = node
.safe_psql("postgres", "SELECT sum(key) FROM t")
@@ -44,28 +50,33 @@ fn test_redo_cases() {
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
assert_eq!(count, 5050);
}
// Runs pg_regress on a compute node
#[test]
#[ignore]
fn test_regress() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let storage_cplane = StorageControlPlane::one_page_server(false);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
control_plane::regress_check(&node);
}
// Run two postgres instances on one pageserver
#[test]
#[ignore]
fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let storage_cplane = StorageControlPlane::one_page_server(false);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
// Allocate postgres instance, but don't start
@@ -74,6 +85,11 @@ fn test_pageserver_multitenancy() {
node1.start(&storage_cplane);
node2.start(&storage_cplane);
// XXX: add some extension func to postgres to check walsender conn
// XXX: or better just drop that
println!("await pageserver connection...");
sleep(Duration::from_secs(3));
// check node1
node1.safe_psql(
"postgres",
@@ -81,7 +97,7 @@ fn test_pageserver_multitenancy() {
);
node1.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
);
let count: i64 = node1
.safe_psql("postgres", "SELECT sum(key) FROM t")
@@ -89,7 +105,7 @@ fn test_pageserver_multitenancy() {
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5000050000);
assert_eq!(count, 5050);
// check node2
node2.safe_psql(
@@ -98,7 +114,7 @@ fn test_pageserver_multitenancy() {
);
node2.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(100000,200000), 'payload'",
"INSERT INTO t SELECT generate_series(100,200), 'payload'",
);
let count: i64 = node2
.safe_psql("postgres", "SELECT sum(key) FROM t")
@@ -106,5 +122,89 @@ fn test_pageserver_multitenancy() {
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 15000150000);
assert_eq!(count, 15150);
}
#[test]
#[ignore]
// Start pageserver using s3 base image
//
// Requires working minio with hardcoded setup:
// .env("S3_ENDPOINT", "https://127.0.0.1:9000")
// .env("S3_REGION", "us-east-1")
// .env("S3_ACCESSKEY", "minioadmin")
// .env("S3_SECRET", "minioadmin")
// .env("S3_BUCKET", "zenith-testbucket")
// TODO use env variables in test
fn test_pageserver_recovery() {
//This test expects that image is already uploaded to s3
//To upload it use zenith_push before test (see node.push_to_s3() for details)
let storage_cplane = StorageControlPlane::one_page_server(true);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
//Wait while daemon uploads pages from s3
sleep(Duration::from_secs(15));
let node_restored = compute_cplane.new_node_wo_data();
//TODO 6947041219207877724 is a hardcoded sysid for my cluster. Get it somewhere
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
node_restored.start(&storage_cplane);
let rows = node_restored.safe_psql("postgres", "SELECT relname from pg_class;");
assert_eq!(rows.len(), 395);
}
#[test]
#[ignore]
//Scenario for future test. Not implemented yet
fn test_pageserver_node_switch() {
//Create pageserver
let storage_cplane = StorageControlPlane::one_page_server(false);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
//Create reqular node
let node = compute_cplane.new_node();
node.start(&storage_cplane);
node.safe_psql(
"postgres",
"CREATE TABLE t(key int primary key, value text)",
);
node.safe_psql(
"postgres",
"INSERT INTO t SELECT generate_series(1,100), 'payload'",
);
let count: i64 = node
.safe_psql("postgres", "SELECT sum(key) FROM t")
.first()
.unwrap()
.get(0);
println!("sum = {}", count);
assert_eq!(count, 5050);
//Push all node files to s3
//TODO upload them directly to pageserver
node.push_to_s3();
//Upload data from s3 to pageserver
//storage_cplane.upload_from_s3() //Not implemented yet
//Shut down the node
node.stop();
//Create new node without files
let node_restored = compute_cplane.new_node_wo_data();
// Setup minimal set of files needed to start node and setup pageserver connection
// TODO 6947041219207877724 is a hardcoded sysid. Get it from node
node_restored.setup_compute_node(6947041219207877724, &storage_cplane);
//Start compute node without files
node_restored.start(&storage_cplane);
//Ensure that is has table created on initial node
let rows = node_restored.safe_psql("postgres", "SELECT key from t;");
assert_eq!(rows.len(), 5050);
}

View File

@@ -159,6 +159,7 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
// TODO move it to a separate function
if !conf.skip_recovery {
restore_s3::restore_main(&conf);
}

View File

@@ -0,0 +1,218 @@
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
use std::fs::File;
use std::io::prelude::*;
use std::io::SeekFrom;
use bytes::{Buf, Bytes};
use log::*;
type XLogRecPtr = u64;
#[repr(C)]
#[derive(Debug, Clone)]
/*
* Body of CheckPoint XLOG records. This is declared here because we keep
* a copy of the latest one in pg_control for possible disaster recovery.
* Changing this struct requires a PG_CONTROL_VERSION bump.
*/
pub struct CheckPoint {
pub redo: XLogRecPtr, /* next RecPtr available when we began to
* create CheckPoint (i.e. REDO start point) */
pub ThisTimeLineID: u32, /* current TLI */
pub PrevTimeLineID: u32, /* previous TLI, if this record begins a new
* timeline (equals ThisTimeLineID otherwise) */
pub fullPageWrites: bool, /* current full_page_writes */
pub nextXid: u64, /* next free transaction ID */
pub nextOid: u32, /* next free OID */
pub nextMulti: u32, /* next free MultiXactId */
pub nextMultiOffset: u32, /* next free MultiXact offset */
pub oldestXid: u32, /* cluster-wide minimum datfrozenxid */
pub oldestXidDB: u32, /* database with minimum datfrozenxid */
pub oldestMulti: u32, /* cluster-wide minimum datminmxid */
pub oldestMultiDB: u32, /* database with minimum datminmxid */
pub time: u64, /* time stamp of checkpoint */
pub oldestCommitTsXid: u32, /* oldest Xid with valid commit
* timestamp */
pub newestCommitTsXid: u32, /* newest Xid with valid commit
* timestamp */
/*
* Oldest XID still running. This is only needed to initialize hot standby
* mode from an online checkpoint, so we only bother calculating this for
* online checkpoints and only when wal_level is replica. Otherwise it's
* set to InvalidTransactionId.
*/
pub oldestActiveXid: u32,
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct ControlFileDataZenith {
pub system_identifier: u64,
pg_control_version: u32, /* PG_CONTROL_VERSION */
catalog_version_no: u32, /* see catversion.h */
state: i32, /* see enum above */
time: i64, /* time stamp of last pg_control update */
pub checkPoint: XLogRecPtr,
checkPointCopy: CheckPoint, /* copy of last check point record */
unloggedLSN: XLogRecPtr, /* current fake LSN value, for unlogged rels */
minRecoveryPoint: XLogRecPtr,
minRecoveryPointTLI: u32,
backupStartPoint: XLogRecPtr,
backupEndPoint: XLogRecPtr,
backupEndRequired: bool,
}
impl ControlFileDataZenith {
pub fn new() -> ControlFileDataZenith {
ControlFileDataZenith {
system_identifier: 0,
pg_control_version: 0,
catalog_version_no: 0,
state: 0,
time: 0,
checkPoint: 0,
checkPointCopy: {
CheckPoint {
redo: 0,
ThisTimeLineID: 0,
PrevTimeLineID: 0,
fullPageWrites: false,
nextXid: 0,
nextOid: 0,
nextMulti: 0,
nextMultiOffset: 0,
oldestXid: 0,
oldestXidDB: 0,
oldestMulti: 0,
oldestMultiDB: 0,
time: 0,
oldestCommitTsXid: 0,
newestCommitTsXid: 0,
oldestActiveXid: 0,
}
},
unloggedLSN: 0,
minRecoveryPoint: 0,
minRecoveryPointTLI: 0,
backupStartPoint: 0,
backupEndPoint: 0,
backupEndRequired: false,
}
}
}
pub fn decode_pg_control(mut buf: Bytes) -> ControlFileDataZenith {
info!("decode pg_control");
let controlfile: ControlFileDataZenith = ControlFileDataZenith {
system_identifier: buf.get_u64_le(),
pg_control_version: buf.get_u32_le(),
catalog_version_no: buf.get_u32_le(),
state: buf.get_i32_le(),
time: {
buf.advance(4);
buf.get_i64_le()
},
checkPoint: buf.get_u64_le(),
checkPointCopy: {
CheckPoint {
redo: buf.get_u64_le(),
ThisTimeLineID: buf.get_u32_le(),
PrevTimeLineID: buf.get_u32_le(),
fullPageWrites: buf.get_u8() != 0,
nextXid: {
buf.advance(7);
buf.get_u64_le()
},
nextOid: buf.get_u32_le(),
nextMulti: buf.get_u32_le(),
nextMultiOffset: buf.get_u32_le(),
oldestXid: buf.get_u32_le(),
oldestXidDB: buf.get_u32_le(),
oldestMulti: buf.get_u32_le(),
oldestMultiDB: buf.get_u32_le(),
time: {
buf.advance(4);
buf.get_u64_le()
},
oldestCommitTsXid: buf.get_u32_le(),
newestCommitTsXid: buf.get_u32_le(),
oldestActiveXid: buf.get_u32_le(),
}
},
unloggedLSN: buf.get_u64_le(),
minRecoveryPoint: buf.get_u64_le(),
minRecoveryPointTLI: buf.get_u32_le(),
backupStartPoint: {
buf.advance(4);
buf.get_u64_le()
},
backupEndPoint: buf.get_u64_le(),
backupEndRequired: buf.get_u8() != 0,
};
return controlfile;
}
pub fn parse_controlfile(b: Bytes) {
let controlfile = decode_pg_control(b);
info!(
"controlfile {:X}/{:X}",
controlfile.checkPoint >> 32,
controlfile.checkPoint
);
info!("controlfile {:?}", controlfile);
}
const MAX_MAPPINGS: usize = 62;
#[derive(Debug)]
struct RelMapping {
mapoid: u32, /* OID of a catalog */
mapfilenode: u32, /* its filenode number */
}
#[derive(Debug)]
pub struct RelMapFile {
magic: i32, /* always RELMAPPER_FILEMAGIC */
num_mappings: i32, /* number of valid RelMapping entries */
mappings: [u8; MAX_MAPPINGS * 8],
crc: u32, /* CRC of all above */
pad: i32, /* to make the struct size be 512 exactly */
}
pub fn decode_filemapping(mut buf: Bytes) -> RelMapFile {
info!("decode filemap");
let file: RelMapFile = RelMapFile {
magic: buf.get_i32_le(), /* always RELMAPPER_FILEMAGIC */
num_mappings: buf.get_i32_le(), /* number of valid RelMapping entries */
mappings: {
let mut arr = [0 as u8; MAX_MAPPINGS * 8];
buf.copy_to_slice(&mut arr);
arr
},
crc: buf.get_u32_le(), /* CRC of all above */
pad: buf.get_i32_le(),
};
info!("decode filemap {:?}", file);
file
}
pub fn write_buf_to_file(filepath: String, buf: Bytes, blkno: u32) {
info!("write_buf_to_file {}", filepath.clone());
let mut buffer = File::create(filepath.clone()).unwrap();
buffer.seek(SeekFrom::Start(8192 * blkno as u64)).unwrap();
buffer.write_all(&buf).unwrap();
info!("DONE write_buf_to_file {}", filepath);
}

View File

@@ -1,8 +1,11 @@
use std::net::SocketAddr;
use std::path::PathBuf;
pub mod controlfile;
pub mod page_cache;
pub mod page_service;
#[allow(dead_code)]
pub mod pg_constants;
pub mod restore_s3;
pub mod tui;
pub mod tui_event;

View File

@@ -8,20 +8,21 @@
use core::ops::Bound::Included;
use std::collections::{BTreeMap, HashMap};
use std::{convert::TryInto, ops::AddAssign};
use std::error::Error;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
// use tokio::sync::RwLock;
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use crate::{walredo, PageServerConf};
use crate::{controlfile, walredo, PageServerConf};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
@@ -107,6 +108,8 @@ struct PageCacheShared {
first_valid_lsn: u64,
last_valid_lsn: u64,
last_record_lsn: u64,
controldata: controlfile::ControlFileDataZenith,
}
lazy_static! {
@@ -146,6 +149,7 @@ fn init_page_cache() -> PageCache {
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
controldata: controlfile::ControlFileDataZenith::new(),
}),
valid_lsn_condvar: Condvar::new(),
@@ -176,7 +180,7 @@ fn init_page_cache() -> PageCache {
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
pub struct CacheKey {
pub tag: BufferTag,
pub lsn: u64,
@@ -215,7 +219,7 @@ impl CacheEntry {
}
}
#[derive(Eq, PartialEq, Hash, Clone, Copy)]
#[derive(Eq, PartialEq, Hash, Clone, Copy, Debug)]
pub struct RelTag {
pub spcnode: u32,
pub dbnode: u32,
@@ -223,7 +227,7 @@ pub struct RelTag {
pub forknum: u8,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub struct BufferTag {
pub spcnode: u32,
pub dbnode: u32,
@@ -242,25 +246,145 @@ pub struct WALRecord {
// Public interface functions
impl PageCache {
pub fn get_nonrel_page(&self, tag: BufferTag, _reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Now we don't have versioning for non-rel pages.
// Also at bootstrap we don't know lsn for some files.
// So always request the very latest version
// let lsn = reqlsn;
let lsn = u64::MAX;
let minkey = CacheKey { tag: tag, lsn: 0 };
// Look up to the largest lsn
let maxkey = CacheKey { tag: tag, lsn: lsn };
let entry_rc: Arc<CacheEntry>;
{
let shared = self.shared.lock().unwrap();
let pagecache = &shared.pagecache;
info!("got pagecache {}", pagecache.len());
let mut entries = pagecache.range((Included(&minkey), Included(&maxkey)));
let entry_opt = entries.next_back();
if entry_opt.is_none() {
return Err(format!(
"not found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
))?;
}
info!(
"found non-rel page with LSN {} for {}/{}/{}.{} blk {}",
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
);
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
// Now that we have a reference to the cache entry, drop the lock on the map.
// It's important to do this before waiting on the condition variable below,
// and better to do it as soon as possible to maximize concurrency.
}
// Lock the cache entry and dig the page image out of it.
let page_img: Bytes;
{
let entry_content = entry_rc.content.lock().unwrap();
if let Some(img) = &entry_content.page_image {
assert!(!entry_content.apply_pending);
page_img = img.clone();
} else if entry_content.wal_record.is_some() {
return Err("non-rel WAL redo is not implemented yet".into());
//
// If this page needs to be reconstructed by applying some WAL,
// send a request to the WAL redo thread.
//
// if !entry_content.apply_pending {
// assert!(!entry_content.apply_pending);
// entry_content.apply_pending = true;
// let s = &self.walredo_sender;
// s.send(entry_rc.clone())?;
// }
// while entry_content.apply_pending {
// entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
//}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
// page_img = match &entry_content.page_image {
// Some(p) => p.clone(),
// None => {
// error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
// return Err("could not apply WAL to reconstruct page image".into());
// }
// };
} else {
// No base image, and no WAL record. Huh?
return Err(format!("no page image or WAL record for requested page"))?;
}
}
trace!(
"Returning page for {}/{}/{}.{} blk {}",
tag.spcnode,
tag.dbnode,
tag.relnode,
tag.forknum,
tag.blknum
);
return Ok(page_img);
}
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
pub fn get_page_at_lsn(&self, tag: BufferTag, reqlsn: u64) -> Result<Bytes, Box<dyn Error>> {
let mut lsn = reqlsn;
if tag.forknum > 40 {
info!(
"get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}",
lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum
);
return self.get_nonrel_page(tag, lsn);
}
if reqlsn == 0 {
let c = self.get_controldata();
lsn = c.checkPoint;
info!("update reqlsn get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn,
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum);
}
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag: tag, lsn: 0 };
let maxkey = CacheKey { tag: tag, lsn: lsn };
let entry_rc: Arc<CacheEntry>;
{
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
while lsn > shared.last_valid_lsn {
// When server just started and created checkpoint lsn,
// but we have not yet established connection,
// requested lsn will be larger than the one we have
while lsn > shared.last_valid_lsn + 500 {
// TODO: Wait for the WAL receiver to catch up
waited = true;
trace!(
@@ -276,8 +400,8 @@ impl PageCache {
shared = wait_result.0;
if wait_result.1.timed_out() {
return Err(format!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32, lsn & 0xffff_ffff
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
))?;
}
}
@@ -286,8 +410,7 @@ impl PageCache {
}
if lsn < shared.first_valid_lsn {
return Err(format!("LSN {:X}/{:X} has already been removed",
lsn >> 32, lsn & 0xffff_ffff))?;
return Err(format!("LSN {} has already been removed", lsn))?;
}
let pagecache = &shared.pagecache;
@@ -297,9 +420,9 @@ impl PageCache {
let entry_opt = entries.next_back();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192];
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
//static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192];
//return Ok(Bytes::from_static(&ZERO_PAGE));
return Err("could not find page image")?;
}
let (_key, entry) = entry_opt.unwrap();
entry_rc = entry.clone();
@@ -476,8 +599,10 @@ impl PageCache {
self.num_entries.fetch_add(1, Ordering::Relaxed);
assert!(oldentry.is_none());
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
debug!(
"inserted page image for {}/{}/{}_{} blk {} at {}",
tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn
);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
@@ -550,6 +675,16 @@ impl PageCache {
return shared.last_record_lsn;
}
pub fn set_controldata(&self, c: controlfile::ControlFileDataZenith) {
let mut shared = self.shared.lock().unwrap();
shared.controldata = c;
}
pub fn get_controldata(&self) -> controlfile::ControlFileDataZenith {
let shared = self.shared.lock().unwrap();
return shared.controldata.clone();
}
//
// Simple test function for the WAL redo code:
//
@@ -610,6 +745,7 @@ impl PageCache {
*entry = to + 1;
}
}
trace!("relsize_inc {:?} to {}", rel, entry);
}
pub fn relsize_get(&self, rel: &RelTag) -> u32 {

View File

@@ -24,6 +24,8 @@ use crate::page_cache;
use crate::walreceiver;
use crate::PageServerConf;
use crate::controlfile;
type Result<T> = std::result::Result<T, io::Error>;
#[derive(Debug)]
@@ -51,7 +53,6 @@ enum BeMessage {
RowDescription,
DataRow,
CommandComplete,
ControlFile,
//
// All that messages are actually CopyData from libpq point of view.
@@ -339,18 +340,6 @@ impl Connection {
self.stream.write_buf(&mut b).await?;
}
BeMessage::ControlFile => {
// TODO pass checkpoint and xid info in this message
let mut b = Bytes::from("hello pg_control");
self.stream.write_u8(b'D').await?;
self.stream.write_i32(4 + 2 + 4 + b.len() as i32).await?;
self.stream.write_i16(1).await?;
self.stream.write_i32(b.len() as i32).await?;
self.stream.write_buf(&mut b).await?;
}
BeMessage::CommandComplete => {
let mut b = Bytes::from("SELECT 1\0");
@@ -438,8 +427,54 @@ impl Connection {
async fn process_query(&mut self, q: &FeQueryMessage) -> Result<()> {
trace!("got query {:?}", q.body);
if q.body.starts_with(b"controlfile") {
self.handle_controlfile().await
if q.body.starts_with(b"file") {
let (_l, r) = q.body.split_at("file ".len());
//TODO parse it correctly
let r = r.to_vec();
let str = String::from_utf8(r).unwrap().to_string();
let mut split = str.split(',');
let mut s;
let filepath = split.next().unwrap();
let sysid = {
s = split.next().unwrap();
s.parse::<u64>().unwrap()
};
let buf_tag = page_cache::BufferTag {
spcnode: {
s = split.next().unwrap();
s.parse::<u32>().unwrap()
},
dbnode: {
s = split.next().unwrap();
s.parse::<u32>().unwrap()
},
relnode: {
s = split.next().unwrap();
s.parse::<u32>().unwrap()
},
forknum: {
s = split.next().unwrap();
s.parse::<u8>().unwrap()
},
blknum: {
s = split.next().unwrap();
s.parse::<u32>().unwrap()
},
};
//TODO PARSE LSN
//let lsn = { s = split.next().unwrap(); s.parse::<u64>().unwrap()};
let lsn: u64 = 0;
info!(
"process file query sysid {} -- {:?} lsn {}",
sysid, buf_tag, lsn
);
self.handle_file(filepath.to_string(), sysid, buf_tag, lsn.into())
.await
} else if q.body.starts_with(b"pagestream ") {
let (_l, r) = q.body.split_at("pagestream ".len());
let mut r = r.to_vec();
@@ -486,10 +521,29 @@ impl Connection {
}
}
async fn handle_controlfile(&mut self) -> Result<()> {
async fn handle_file(
&mut self,
filepath: String,
sysid: u64,
buf_tag: page_cache::BufferTag,
lsn: u64,
) -> Result<()> {
let pcache = page_cache::get_pagecache(self.conf.clone(), sysid);
match pcache.get_page_at_lsn(buf_tag, lsn) {
Ok(p) => {
info!("info succeeded get_page_at_lsn: {}", lsn);
controlfile::write_buf_to_file(filepath, p, buf_tag.blknum);
}
Err(e) => {
info!("page not found and it's ok. get_page_at_lsn: {}", e);
}
};
self.write_message_noflush(&BeMessage::RowDescription)
.await?;
self.write_message_noflush(&BeMessage::ControlFile).await?;
self.write_message_noflush(&BeMessage::DataRow).await?;
self.write_message_noflush(&BeMessage::CommandComplete)
.await?;
self.write_message(&BeMessage::ReadyForQuery).await
@@ -558,6 +612,7 @@ impl Connection {
let n_blocks = pcache.relsize_get(&tag);
trace!("ZenithNblocksRequest {:?} = {}", tag, n_blocks);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks: n_blocks,
@@ -574,11 +629,26 @@ impl Connection {
};
let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: p,
}),
Ok(p) => {
let mut b = BytesMut::with_capacity(8192);
trace!("ZenithReadResponse get_page_at_lsn succeed");
if p.len() < 8192 {
//add padding
trace!("ZenithReadResponse add padding");
let padding: [u8; 8192 - 512] = [0; 8192 - 512];
b.extend_from_slice(&p);
b.extend_from_slice(&padding);
} else {
b.extend_from_slice(&p);
}
BeMessage::ZenithReadResponse(ZenithReadResponse {
ok: true,
n_blocks: 0,
page: b.freeze(),
})
}
Err(e) => {
const ZERO_PAGE: [u8; 8192] = [0; 8192];
error!("get_page_at_lsn: {}", e);
@@ -599,6 +669,7 @@ impl Connection {
relnode: req.relnode,
forknum: req.forknum,
};
trace!("ZenithCreateRequest {:?}", tag);
pcache.relsize_inc(&tag, None);
@@ -616,6 +687,8 @@ impl Connection {
forknum: req.forknum,
};
trace!("ZenithExtendRequest {:?} to {}", tag, req.blkno);
pcache.relsize_inc(&tag, Some(req.blkno));
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {

View File

@@ -0,0 +1,11 @@
// From pg_tablespace_d.h
//
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
//Special values for non-rel files' tags
//TODO maybe use enum?
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
pub const PG_XACT_FORKNUM: u32 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;

View File

@@ -22,7 +22,7 @@ use tokio::runtime;
use futures::future;
use crate::{page_cache, PageServerConf};
use crate::{controlfile, page_cache, pg_constants, PageServerConf};
struct Storage {
region: Region,
@@ -84,8 +84,24 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
.list("relationdata/".to_string(), Some("".to_string()))
.await?;
// TODO: get that from backup
let sys_id: u64 = 42;
//Before uploading other files, slurp pg_control to set systemid
let control_results: Vec<s3::serde_types::ListBucketResult> = bucket
.list(
"relationdata/global/pg_control".to_string(),
Some("".to_string()),
)
.await?;
let object = &(&control_results[0]).contents[0];
let (data, _) = bucket.get_object(&object.key).await.unwrap();
let bytes = BytesMut::from(data.as_slice()).freeze();
let c = controlfile::decode_pg_control(bytes);
let pcache = page_cache::get_pagecache(conf.clone(), c.system_identifier);
pcache.set_controldata(c.clone());
trace!("uploaded controlfile {:?}", pcache.get_controldata());
let sys_id: u64 = c.system_identifier;
let mut oldest_lsn = 0;
let mut slurp_futures: Vec<_> = Vec::new();
@@ -119,23 +135,47 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
panic!("no base backup found");
}
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
//Now add nonrelation files
let nonrelresults: Vec<s3::serde_types::ListBucketResult> = bucket
.list("nonreldata/".to_string(), Some("".to_string()))
.await?;
for result in nonrelresults {
for object in result.contents {
// Download needed non relation files, slurping them into memory
let key = object.key;
let relpath = key.strip_prefix("nonreldata/").unwrap();
trace!("list nonrelfiles {}", relpath);
let parsed = parse_nonrel_file_path(&relpath);
match parsed {
Ok(p) => {
let b = bucket.clone();
let f = slurp_base_file(conf, sys_id, b, key.to_string(), p);
slurp_futures.push(f);
}
Err(e) => {
warn!("unrecognized file: {} ({})", relpath, e);
}
};
}
}
pcache.init_valid_lsn(oldest_lsn);
info!("{} files to restore...", slurp_futures.len());
future::join_all(slurp_futures).await;
info!("restored!");
info!(
"restored! {:?} to {:?}",
pcache.first_valid_lsn, pcache.last_valid_lsn
);
Ok(())
}
// From pg_tablespace_d.h
//
// FIXME: we'll probably need these elsewhere too, move to some common location
const DEFAULTTABLESPACE_OID: u32 = 1663;
const GLOBALTABLESPACE_OID: u32 = 1664;
#[derive(Debug)]
struct FilePathError {
msg: String,
@@ -185,6 +225,17 @@ struct ParsedBaseImageFileName {
pub lsn: u64,
}
fn parse_lsn_from_filename(fname: &str) -> Result<u64, FilePathError> {
let (_, lsn_str) = fname.split_at(fname.len() - 16);
let (lsnhi, lsnlo) = lsn_str.split_at(8);
let lsn_hi = u64::from_str_radix(lsnhi, 16)?;
let lsn_lo = u64::from_str_radix(lsnlo, 16)?;
let lsn = lsn_hi << 32 | lsn_lo;
return Ok(lsn);
}
// formats:
// <oid>
// <oid>_<fork name>
@@ -223,6 +274,46 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
return Ok((relnode, forknum, segno, lsn));
}
fn parse_nonrel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
//TODO parse segno from xact filenames too
if let Some(fname) = path.strip_prefix("pg_xact/") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_XACT_FORKNUM,
segno: 0,
lsn,
});
} else if let Some(fname) = path.strip_prefix("pg_multixact/offsets") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_MXACT_OFFSETS_FORKNUM,
segno: 0,
lsn,
});
} else if let Some(fname) = path.strip_prefix("pg_multixact/members") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_MXACT_MEMBERS_FORKNUM,
segno: 0,
lsn,
});
} else {
return Err(FilePathError::new("invalid non relation data file name"));
}
}
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
/*
* Relation data files can be in one of the following directories:
@@ -242,10 +333,36 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
* <oid>.<segment number>
*/
if let Some(fname) = path.strip_prefix("global/") {
if fname.contains("pg_control") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_CONTROLFILE_FORKNUM,
segno: 0,
lsn,
});
}
if fname.contains("pg_filenode") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
segno: 0,
lsn,
});
}
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: GLOBALTABLESPACE_OID,
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
relnode,
forknum,
@@ -265,10 +382,23 @@ fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathEr
return Err(FilePathError::new("invalid relation data file name"));
};
if fname.contains("pg_filenode") {
let lsn = parse_lsn_from_filename(fname.clone())?;
return Ok(ParsedBaseImageFileName {
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode,
relnode: 0,
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
segno: 0,
lsn,
});
}
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: DEFAULTTABLESPACE_OID,
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode,
relnode,
forknum,
@@ -302,22 +432,55 @@ async fn slurp_base_file(
let mut bytes = BytesMut::from(data.as_slice()).freeze();
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
// pg_filenode.map has non-standard size - 512 bytes
if parsed.forknum == pg_constants::PG_FILENODEMAP_FORKNUM {
let b = bytes.clone();
controlfile::decode_filemapping(b);
while bytes.remaining() >= 512 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: 0,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(512));
}
let tag = page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
pcache.relsize_inc(&tag, Some(0));
} else {
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let reltag = page_cache::RelTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
};
blknum += 1;
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
pcache.relsize_inc(&reltag, Some(blknum));
blknum += 1;
}
}
}

View File

@@ -4,6 +4,8 @@
//#![allow(dead_code)]
//include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
use crate::pg_constants;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::cmp::min;
@@ -234,6 +236,8 @@ const BLCKSZ: u16 = 8192;
//
// Constants from xlogrecord.h
//
const XLR_INFO_MASK: u8 = 0x0F;
const XLR_MAX_BLOCK_ID: u8 = 32;
const XLR_BLOCK_ID_DATA_SHORT: u8 = 255;
@@ -253,6 +257,12 @@ const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
//
// constants from clog.h
//
const CLOG_XACTS_PER_BYTE: u32 = 4;
const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
pub struct DecodedBkpBlock {
/* Is this block ref in use? */
//in_use: bool,
@@ -261,7 +271,7 @@ pub struct DecodedBkpBlock {
pub rnode_spcnode: u32,
pub rnode_dbnode: u32,
pub rnode_relnode: u32,
pub forknum: u8,
pub forknum: u8, // Note that we have a few special forknum values for non-rel files. Handle them too
pub blkno: u32,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
@@ -297,6 +307,26 @@ pub struct DecodedWALRecord {
const XLOG_SWITCH: u8 = 0x40;
const RM_XLOG_ID: u8 = 0;
const RM_XACT_ID: u8 = 1;
// const RM_CLOG_ID:u8 = 3;
//const RM_MULTIXACT_ID:u8 = 6;
// from xact.h
const XLOG_XACT_COMMIT: u8 = 0x00;
// const XLOG_XACT_PREPARE: u8 = 0x10;
// const XLOG_XACT_ABORT: u8 = 0x20;
const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
// const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
// const XLOG_XACT_ASSIGNMENT: u8 = 0x50;
// const XLOG_XACT_INVALIDATIONS: u8 = 0x60;
/* free opcode 0x70 */
/* mask for filtering opcodes out of xl_info */
const XLOG_XACT_OPMASK: u8 = 0x70;
/* does this record have a 'xinfo' field or not */
// const XLOG_XACT_HAS_INFO: u8 = 0x80;
// Is this record an XLOG_SWITCH record? They need some special processing,
// so we need to check for that before the rest of the parsing.
//
@@ -331,13 +361,17 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
// FIXME: assume little-endian here
let xl_tot_len = buf.get_u32_le();
let _xl_xid = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let _xl_info = buf.get_u8();
let _xl_rmid = buf.get_u8();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
info!("decode_wal_record xl_rmid = {}", xl_rmid);
let rminfo: u8 = xl_info & !XLR_INFO_MASK;
let remaining = xl_tot_len - SizeOfXLogRecord;
if buf.remaining() != remaining as usize {
@@ -349,6 +383,53 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
let mut rnode_relnode: u32 = 0;
let mut got_rnode = false;
if xl_rmid == RM_XACT_ID
&& ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT
|| (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED)
{
info!("decode_wal_record RM_XACT_ID - XLOG_XACT_COMMIT");
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
let blkno = xl_xid / CLOG_XACTS_PER_PAGE;
let mut blk = DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
rnode_relnode: 0,
forknum: pg_constants::PG_XACT_FORKNUM as u8,
blkno: blkno,
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
bimg_info: 0,
has_data: true,
data_len: 0,
};
let fork_flags = buf.get_u8();
blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0;
blk.data_len = buf.get_u16_le();
info!(
"decode_wal_record RM_XACT_ID blk has data with data_len {}",
blk.data_len
);
blocks.push(blk);
return DecodedWALRecord {
lsn: lsn,
record: rec,
blocks: blocks,
};
}
// Decode the headers
let mut max_block_id = 0;
@@ -552,6 +633,7 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord {
//blk->rnode = *rnode;
}
blk.rnode_spcnode = rnode_spcnode;
blk.rnode_dbnode = rnode_dbnode;
blk.rnode_relnode = rnode_relnode;

View File

@@ -160,9 +160,11 @@ impl WalRedoProcess {
.expect("failed to execute initdb");
if !initdb.status.success() {
panic!("initdb failed: {}\nstderr:\n{}",
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap());
panic!(
"initdb failed: {}\nstderr:\n{}",
std::str::from_utf8(&initdb.stdout).unwrap(),
std::str::from_utf8(&initdb.stderr).unwrap()
);
}
// Start postgres itself