diff --git a/integration_tests/tests/control_plane/mod.rs b/integration_tests/tests/control_plane/mod.rs index 13d7d0e663..bdd7ed8c01 100644 --- a/integration_tests/tests/control_plane/mod.rs +++ b/integration_tests/tests/control_plane/mod.rs @@ -70,12 +70,9 @@ impl StorageControlPlane { data_dir: TEST_WORKDIR.join("pageserver"), }; pserver.init(); - if froms3 - { + if froms3 { pserver.start_froms3(); - } - else - { + } else { pserver.start(); } @@ -379,10 +376,10 @@ impl ComputeControlPlane<'_> { node } - // Init compute node without files, only datadir structure - // use initdb --compute-node flag and GUC 'computenode_mode' - // to distinguish the node - pub fn new_minimal_node<'a>(&mut self) -> &Arc { + // Init compute node without files, only datadir structure + // use initdb --compute-node flag and GUC 'computenode_mode' + // to distinguish the node + pub fn new_minimal_node<'a>(&mut self) -> &Arc { // allocate new node entry with generated port let node_id = self.nodes.len() + 1; let node = PostgresNode { @@ -448,9 +445,17 @@ impl ComputeControlPlane<'_> { let pserver = storage_cplane.page_server_addr(); // Configure that node to take pages from pageserver - node.append_conf("postgresql.conf", format!("\ + node.append_conf( + "postgresql.conf", + format!( + "\ page_server_connstring = 'host={} port={}'\n\ - ", pserver.ip(), pserver.port()).as_str()); + ", + pserver.ip(), + pserver.port() + ) + .as_str(), + ); node.clone() } @@ -525,7 +530,6 @@ pub struct PostgresNode { pg_bin_dir: PathBuf, } - impl PostgresNode { pub fn append_conf(&self, config: &str, opts: &str) { OpenOptions::new() @@ -636,80 +640,81 @@ impl PostgresNode { // And reqular query() uses prepared queries. // TODO pass sysid as parameter - pub fn setup_compute_node(&self, sysid: u64, storage_cplane: &StorageControlPlane) - { + pub fn setup_compute_node(&self, sysid: u64, storage_cplane: &StorageControlPlane) { let mut query; //Request pg_control from pageserver - query = format!("file {}/global/pg_control,{},{},{},{},{},{},{}", + query = format!( + "file {}/global/pg_control,{},{},{},{},{},{},{}", self.pgdata.to_str().unwrap(), sysid as u64, //sysid - 1664, //tablespace - 0, //dboid - 0, //reloid - 42, //forknum pg_control - 0, //blkno - 0 //lsn + 1664, //tablespace + 0, //dboid + 0, //reloid + 42, //forknum pg_control + 0, //blkno + 0 //lsn ); storage_cplane.page_server_psql(query.as_str()); //Request pg_xact and pg_multixact from pageserver //We need them for initial pageserver startup and authentication //TODO figure out which block number we really need - query = format!("file {}/pg_xact/0000,{},{},{},{},{},{},{}", + query = format!( + "file {}/pg_xact/0000,{},{},{},{},{},{},{}", self.pgdata.to_str().unwrap(), sysid as u64, //sysid - 0, //tablespace - 0, //dboid - 0, //reloid - 44, //forknum - 0, //blkno - 0 //lsn + 0, //tablespace + 0, //dboid + 0, //reloid + 44, //forknum + 0, //blkno + 0 //lsn ); storage_cplane.page_server_psql(query.as_str()); - query = format!("file {}/pg_multixact/offsets/0000,{},{},{},{},{},{},{}", + query = format!( + "file {}/pg_multixact/offsets/0000,{},{},{},{},{},{},{}", self.pgdata.to_str().unwrap(), sysid as u64, //sysid - 0, //tablespace - 0, //dboid - 0, //reloid - 45, //forknum - 0, //blkno - 0 //lsn + 0, //tablespace + 0, //dboid + 0, //reloid + 45, //forknum + 0, //blkno + 0 //lsn ); storage_cplane.page_server_psql(query.as_str()); - query = format!("file {}/pg_multixact/members/0000,{},{},{},{},{},{},{}", + query = format!( + "file {}/pg_multixact/members/0000,{},{},{},{},{},{},{}", self.pgdata.to_str().unwrap(), sysid as u64, //sysid - 0, //tablespace - 0, //dboid - 0, //reloid - 46, //forknum - 0, //blkno - 0 //lsn + 0, //tablespace + 0, //dboid + 0, //reloid + 46, //forknum + 0, //blkno + 0 //lsn ); storage_cplane.page_server_psql(query.as_str()); - //Request a few shared catalogs needed for authentication //Without them we cannot setup connection with pageserver to request further pages let reloids = [1260, 1261, 1262, 2396]; - for reloid in reloids.iter() - { + for reloid in reloids.iter() { //FIXME request all blocks from file, not just 10 - for blkno in 0..10 - { - query = format!("file {}/global/{},{},{},{},{},{},{},{}", + for blkno in 0..10 { + query = format!( + "file {}/global/{},{},{},{},{},{},{},{}", self.pgdata.to_str().unwrap(), - reloid, //suse it as filename + reloid, //suse it as filename sysid as u64, //sysid - 1664, //tablespace - 0, //dboid - reloid, //reloid - 0, //forknum - blkno, //blkno - 0 //lsn + 1664, //tablespace + 0, //dboid + reloid, //reloid + 0, //forknum + blkno, //blkno + 0 //lsn ); storage_cplane.page_server_psql(query.as_str()); } @@ -719,8 +724,15 @@ impl PostgresNode { fs::create_dir(format!("{}/base/13007", self.pgdata.to_str().unwrap())).unwrap(); //FIXME figure out what wal file we need to successfully start - let walfilepath = format!("{}/pg_wal/000000010000000000000001", self.pgdata.to_str().unwrap()); - fs::copy("/home/anastasia/zenith/zenith/tmp_check/pgdata/pg_wal/000000010000000000000001", walfilepath).unwrap(); + let walfilepath = format!( + "{}/pg_wal/000000010000000000000001", + self.pgdata.to_str().unwrap() + ); + fs::copy( + "/home/anastasia/zenith/zenith/tmp_check/pgdata/pg_wal/000000010000000000000001", + walfilepath, + ) + .unwrap(); println!("before resetwal "); @@ -743,7 +755,6 @@ impl PostgresNode { } println!("setup done"); - } pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode { @@ -761,8 +772,7 @@ impl PostgresNode { } } - pub fn push_to_s3(&self) - { + pub fn push_to_s3(&self) { println!("Push to s3 node at '{}'", self.pgdata.to_str().unwrap()); let zenith_push_path = self.pg_bin_dir.join("zenith_push"); diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index a6d07c61ba..732304ab54 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -6,7 +6,6 @@ use std::time::Duration; use control_plane::ComputeControlPlane; use control_plane::StorageControlPlane; - // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff // -- pageserver api endpoint to check all rels @@ -27,8 +26,14 @@ fn test_redo_cases() { sleep(Duration::from_secs(3)); // check basic work with table - node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); - node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100), 'payload'"); + node.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + node.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(1,100), 'payload'", + ); let count: i64 = node .safe_psql("postgres", "SELECT sum(key) FROM t") .first() @@ -86,8 +91,14 @@ fn test_pageserver_multitenancy() { sleep(Duration::from_secs(3)); // check node1 - node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); - node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100), 'payload'"); + node1.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + node1.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(1,100), 'payload'", + ); let count: i64 = node1 .safe_psql("postgres", "SELECT sum(key) FROM t") .first() @@ -97,8 +108,14 @@ fn test_pageserver_multitenancy() { assert_eq!(count, 5050); // check node2 - node2.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); - node2.safe_psql("postgres", "INSERT INTO t SELECT generate_series(100,200), 'payload'"); + node2.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + node2.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(100,200), 'payload'", + ); let count: i64 = node2 .safe_psql("postgres", "SELECT sum(key) FROM t") .first() @@ -120,7 +137,8 @@ fn test_pageserver_multitenancy() { // .env("S3_BUCKET", "zenith-testbucket") // TODO use env variables in test fn test_pageserver_recovery() { - + //This test expects that image is already uploaded to s3 + //To upload it use zenith_push before test (see node.push_to_s3() for details) let storage_cplane = StorageControlPlane::one_page_server(true); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); @@ -143,7 +161,6 @@ fn test_pageserver_recovery() { #[ignore] //Scenario for future test. Not implemented yet fn test_pageserver_node_switch() { - //Create pageserver let storage_cplane = StorageControlPlane::one_page_server(false); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); @@ -152,8 +169,14 @@ fn test_pageserver_node_switch() { let node = compute_cplane.new_node(); node.start(&storage_cplane); - node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); - node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100), 'payload'"); + node.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + node.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(1,100), 'payload'", + ); let count: i64 = node .safe_psql("postgres", "SELECT sum(key) FROM t") .first() diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 31a40ecb34..9f14c5651f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -230,7 +230,7 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard { if record.level().is_at_least(slog::Level::Info) { return true; } - return true; + return false; }); let drain = std::sync::Mutex::new(drain).fuse(); let logger = slog::Logger::root(drain, slog::o!()); diff --git a/pageserver/src/controlfile.rs b/pageserver/src/controlfile.rs index 24a26396b8..91ee90dbe5 100644 --- a/pageserver/src/controlfile.rs +++ b/pageserver/src/controlfile.rs @@ -1,8 +1,8 @@ #![allow(non_camel_case_types)] #![allow(non_snake_case)] -use std::io::prelude::*; use std::fs::File; +use std::io::prelude::*; use std::io::SeekFrom; use bytes::{Buf, Bytes}; @@ -11,86 +11,79 @@ use log::*; type XLogRecPtr = u64; - #[repr(C)] -#[derive(Debug)] -#[derive(Clone)] +#[derive(Debug, Clone)] /* * Body of CheckPoint XLOG records. This is declared here because we keep * a copy of the latest one in pg_control for possible disaster recovery. * Changing this struct requires a PG_CONTROL_VERSION bump. */ pub struct CheckPoint { - pub redo: XLogRecPtr, /* next RecPtr available when we began to - * create CheckPoint (i.e. REDO start point) */ - pub ThisTimeLineID: u32, /* current TLI */ - pub PrevTimeLineID: u32, /* previous TLI, if this record begins a new - * timeline (equals ThisTimeLineID otherwise) */ - pub fullPageWrites: bool, /* current full_page_writes */ - pub nextXid: u64, /* next free transaction ID */ - pub nextOid: u32, /* next free OID */ - pub nextMulti: u32, /* next free MultiXactId */ - pub nextMultiOffset: u32, /* next free MultiXact offset */ - pub oldestXid: u32, /* cluster-wide minimum datfrozenxid */ - pub oldestXidDB: u32, /* database with minimum datfrozenxid */ - pub oldestMulti: u32, /* cluster-wide minimum datminmxid */ - pub oldestMultiDB: u32, /* database with minimum datminmxid */ - pub time: u64, /* time stamp of checkpoint */ - pub oldestCommitTsXid: u32, /* oldest Xid with valid commit - * timestamp */ - pub newestCommitTsXid: u32, /* newest Xid with valid commit - * timestamp */ + pub redo: XLogRecPtr, /* next RecPtr available when we began to + * create CheckPoint (i.e. REDO start point) */ + pub ThisTimeLineID: u32, /* current TLI */ + pub PrevTimeLineID: u32, /* previous TLI, if this record begins a new + * timeline (equals ThisTimeLineID otherwise) */ + pub fullPageWrites: bool, /* current full_page_writes */ + pub nextXid: u64, /* next free transaction ID */ + pub nextOid: u32, /* next free OID */ + pub nextMulti: u32, /* next free MultiXactId */ + pub nextMultiOffset: u32, /* next free MultiXact offset */ + pub oldestXid: u32, /* cluster-wide minimum datfrozenxid */ + pub oldestXidDB: u32, /* database with minimum datfrozenxid */ + pub oldestMulti: u32, /* cluster-wide minimum datminmxid */ + pub oldestMultiDB: u32, /* database with minimum datminmxid */ + pub time: u64, /* time stamp of checkpoint */ + pub oldestCommitTsXid: u32, /* oldest Xid with valid commit + * timestamp */ + pub newestCommitTsXid: u32, /* newest Xid with valid commit + * timestamp */ - /* - * Oldest XID still running. This is only needed to initialize hot standby - * mode from an online checkpoint, so we only bother calculating this for - * online checkpoints and only when wal_level is replica. Otherwise it's - * set to InvalidTransactionId. - */ - pub oldestActiveXid: u32, + /* + * Oldest XID still running. This is only needed to initialize hot standby + * mode from an online checkpoint, so we only bother calculating this for + * online checkpoints and only when wal_level is replica. Otherwise it's + * set to InvalidTransactionId. + */ + pub oldestActiveXid: u32, } - #[repr(C)] -#[derive(Debug)] -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct ControlFileDataZenith { pub system_identifier: u64, - pg_control_version: u32, /* PG_CONTROL_VERSION */ - catalog_version_no: u32, /* see catversion.h */ + pg_control_version: u32, /* PG_CONTROL_VERSION */ + catalog_version_no: u32, /* see catversion.h */ - state: i32, /* see enum above */ - time: i64, /* time stamp of last pg_control update */ - pub checkPoint: XLogRecPtr, + state: i32, /* see enum above */ + time: i64, /* time stamp of last pg_control update */ + pub checkPoint: XLogRecPtr, checkPointCopy: CheckPoint, /* copy of last check point record */ - unloggedLSN: XLogRecPtr, /* current fake LSN value, for unlogged rels */ - minRecoveryPoint: XLogRecPtr, - minRecoveryPointTLI: u32, - backupStartPoint: XLogRecPtr, - backupEndPoint: XLogRecPtr, - backupEndRequired: bool + unloggedLSN: XLogRecPtr, /* current fake LSN value, for unlogged rels */ + minRecoveryPoint: XLogRecPtr, + minRecoveryPointTLI: u32, + backupStartPoint: XLogRecPtr, + backupEndPoint: XLogRecPtr, + backupEndRequired: bool, } impl ControlFileDataZenith { - pub fn new() -> ControlFileDataZenith - { + pub fn new() -> ControlFileDataZenith { ControlFileDataZenith { - system_identifier: 0, + system_identifier: 0, pg_control_version: 0, - catalog_version_no: 0, + catalog_version_no: 0, state: 0, time: 0, checkPoint: 0, - checkPointCopy: - { - CheckPoint - { + checkPointCopy: { + CheckPoint { redo: 0, ThisTimeLineID: 0, PrevTimeLineID: 0, fullPageWrites: false, nextXid: 0, - nextOid:0, + nextOid: 0, nextMulti: 0, nextMultiOffset: 0, oldestXid: 0, @@ -100,109 +93,113 @@ impl ControlFileDataZenith { time: 0, oldestCommitTsXid: 0, newestCommitTsXid: 0, - oldestActiveXid:0 + oldestActiveXid: 0, } }, - unloggedLSN: 0, - minRecoveryPoint: 0, - minRecoveryPointTLI: 0, - backupStartPoint: 0, - backupEndPoint: 0, - backupEndRequired: false, + unloggedLSN: 0, + minRecoveryPoint: 0, + minRecoveryPointTLI: 0, + backupStartPoint: 0, + backupEndPoint: 0, + backupEndRequired: false, } - } + } } pub fn decode_pg_control(mut buf: Bytes) -> ControlFileDataZenith { - info!("decode pg_control"); - let controlfile : ControlFileDataZenith = ControlFileDataZenith { - system_identifier: buf.get_u64_le(), - pg_control_version: buf.get_u32_le(), - catalog_version_no: buf.get_u32_le(), - state: buf.get_i32_le(), - time: { buf.advance(4); buf.get_i64_le() }, - checkPoint: buf.get_u64_le(), - checkPointCopy: - { - CheckPoint - { - redo: buf.get_u64_le(), - ThisTimeLineID: buf.get_u32_le(), - PrevTimeLineID: buf.get_u32_le(), - fullPageWrites: buf.get_u8() != 0, - nextXid: { buf.advance(7); buf.get_u64_le()}, - nextOid: buf.get_u32_le(), - nextMulti: buf.get_u32_le(), - nextMultiOffset: buf.get_u32_le(), - oldestXid:buf.get_u32_le(), - oldestXidDB: buf.get_u32_le(), - oldestMulti: buf.get_u32_le(), - oldestMultiDB: buf.get_u32_le(), - time: { buf.advance(4); buf.get_u64_le()}, - oldestCommitTsXid: buf.get_u32_le(), - newestCommitTsXid: buf.get_u32_le(), - oldestActiveXid:buf.get_u32_le() - } - }, - unloggedLSN: buf.get_u64_le(), - minRecoveryPoint: buf.get_u64_le(), - minRecoveryPointTLI: buf.get_u32_le(), - backupStartPoint:{ buf.advance(4); buf.get_u64_le()}, - backupEndPoint: buf.get_u64_le(), - backupEndRequired: buf.get_u8() != 0, - }; + let controlfile: ControlFileDataZenith = ControlFileDataZenith { + system_identifier: buf.get_u64_le(), + pg_control_version: buf.get_u32_le(), + catalog_version_no: buf.get_u32_le(), + state: buf.get_i32_le(), + time: { + buf.advance(4); + buf.get_i64_le() + }, + checkPoint: buf.get_u64_le(), + checkPointCopy: { + CheckPoint { + redo: buf.get_u64_le(), + ThisTimeLineID: buf.get_u32_le(), + PrevTimeLineID: buf.get_u32_le(), + fullPageWrites: buf.get_u8() != 0, + nextXid: { + buf.advance(7); + buf.get_u64_le() + }, + nextOid: buf.get_u32_le(), + nextMulti: buf.get_u32_le(), + nextMultiOffset: buf.get_u32_le(), + oldestXid: buf.get_u32_le(), + oldestXidDB: buf.get_u32_le(), + oldestMulti: buf.get_u32_le(), + oldestMultiDB: buf.get_u32_le(), + time: { + buf.advance(4); + buf.get_u64_le() + }, + oldestCommitTsXid: buf.get_u32_le(), + newestCommitTsXid: buf.get_u32_le(), + oldestActiveXid: buf.get_u32_le(), + } + }, + unloggedLSN: buf.get_u64_le(), + minRecoveryPoint: buf.get_u64_le(), + minRecoveryPointTLI: buf.get_u32_le(), + backupStartPoint: { + buf.advance(4); + buf.get_u64_le() + }, + backupEndPoint: buf.get_u64_le(), + backupEndRequired: buf.get_u8() != 0, + }; - return controlfile; + return controlfile; } -pub fn parse_controlfile(b: Bytes) -{ +pub fn parse_controlfile(b: Bytes) { let controlfile = decode_pg_control(b); - info!("controlfile {:X}/{:X}", - controlfile.checkPoint >> 32, controlfile.checkPoint); + info!( + "controlfile {:X}/{:X}", + controlfile.checkPoint >> 32, + controlfile.checkPoint + ); info!("controlfile {:?}", controlfile); } - - - const MAX_MAPPINGS: usize = 62; #[derive(Debug)] -struct RelMapping -{ - mapoid: u32, /* OID of a catalog */ - mapfilenode: u32 /* its filenode number */ +struct RelMapping { + mapoid: u32, /* OID of a catalog */ + mapfilenode: u32, /* its filenode number */ } #[derive(Debug)] -pub struct RelMapFile -{ - magic: i32, /* always RELMAPPER_FILEMAGIC */ - num_mappings: i32, /* number of valid RelMapping entries */ - mappings: [u8; MAX_MAPPINGS*8], - crc: u32, /* CRC of all above */ - pad: i32 /* to make the struct size be 512 exactly */ +pub struct RelMapFile { + magic: i32, /* always RELMAPPER_FILEMAGIC */ + num_mappings: i32, /* number of valid RelMapping entries */ + mappings: [u8; MAX_MAPPINGS * 8], + crc: u32, /* CRC of all above */ + pad: i32, /* to make the struct size be 512 exactly */ } pub fn decode_filemapping(mut buf: Bytes) -> RelMapFile { - info!("decode filemap"); - let file : RelMapFile = RelMapFile { - magic: buf.get_i32_le(), /* always RELMAPPER_FILEMAGIC */ - num_mappings: buf.get_i32_le(), /* number of valid RelMapping entries */ - mappings: { - let mut arr = [0 as u8; MAX_MAPPINGS*8]; + let file: RelMapFile = RelMapFile { + magic: buf.get_i32_le(), /* always RELMAPPER_FILEMAGIC */ + num_mappings: buf.get_i32_le(), /* number of valid RelMapping entries */ + mappings: { + let mut arr = [0 as u8; MAX_MAPPINGS * 8]; buf.copy_to_slice(&mut arr); arr - } - , - crc: buf.get_u32_le(), /* CRC of all above */ - pad: buf.get_i32_le() + }, + crc: buf.get_u32_le(), /* CRC of all above */ + pad: buf.get_i32_le(), }; info!("decode filemap {:?}", file); @@ -210,13 +207,12 @@ pub fn decode_filemapping(mut buf: Bytes) -> RelMapFile { } pub fn write_buf_to_file(filepath: String, buf: Bytes, blkno: u32) { - info!("write_buf_to_file {}", filepath.clone()); let mut buffer = File::create(filepath.clone()).unwrap(); - buffer.seek(SeekFrom::Start(8192*blkno as u64)).unwrap(); + buffer.seek(SeekFrom::Start(8192 * blkno as u64)).unwrap(); buffer.write_all(&buf).unwrap(); info!("DONE write_buf_to_file {}", filepath); -} \ No newline at end of file +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index ddbc765044..c0d374a8ad 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,12 +1,11 @@ use std::net::SocketAddr; use std::path::PathBuf; -#[allow(dead_code)] - -pub mod pg_constants; pub mod controlfile; pub mod page_cache; pub mod page_service; +#[allow(dead_code)] +pub mod pg_constants; pub mod restore_s3; pub mod tui; pub mod tui_event; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index b28a4887b3..5b9b8bd864 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -7,25 +7,25 @@ // use core::ops::Bound::Included; -use std::{convert::TryInto, ops::AddAssign}; use std::collections::{BTreeMap, HashMap}; +use std::{convert::TryInto, ops::AddAssign}; use std::error::Error; -use std::sync::{Arc,Condvar, Mutex}; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::time::Duration; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; +use std::time::Duration; // use tokio::sync::RwLock; use bytes::Bytes; use lazy_static::lazy_static; -use rand::Rng; use log::*; +use rand::Rng; -use crate::{PageServerConf, walredo, controlfile}; +use crate::{controlfile, walredo, PageServerConf}; use crossbeam_channel::unbounded; -use crossbeam_channel::{Sender, Receiver}; +use crossbeam_channel::{Receiver, Sender}; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -64,7 +64,6 @@ pub struct PageCacheStats { } impl AddAssign for PageCacheStats { - fn add_assign(&mut self, other: Self) { *self = Self { num_entries: self.num_entries + other.num_entries, @@ -82,7 +81,6 @@ impl AddAssign for PageCacheStats { // Shared data structure, holding page cache and related auxiliary information // struct PageCacheShared { - // The actual page cache pagecache: BTreeMap>, @@ -115,10 +113,10 @@ struct PageCacheShared { } lazy_static! { - pub static ref PAGECACHES : Mutex>> = Mutex::new(HashMap::new()); + pub static ref PAGECACHES: Mutex>> = Mutex::new(HashMap::new()); } -pub fn get_pagecache(conf: PageServerConf, sys_id : u64) -> Arc { +pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { let mut pcaches = PAGECACHES.lock().unwrap(); if !pcaches.contains_key(&sys_id) { @@ -140,21 +138,19 @@ pub fn get_pagecache(conf: PageServerConf, sys_id : u64) -> Arc { pcaches.get(&sys_id).unwrap().clone() } -fn init_page_cache() -> PageCache -{ +fn init_page_cache() -> PageCache { // Initialize the channel between the page cache and the WAL applicator let (s, r) = unbounded(); PageCache { - shared: Mutex::new( - PageCacheShared { - pagecache: BTreeMap::new(), - relsize_cache: HashMap::new(), - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, - controldata: controlfile::ControlFileDataZenith::new() - }), + shared: Mutex::new(PageCacheShared { + pagecache: BTreeMap::new(), + relsize_cache: HashMap::new(), + first_valid_lsn: 0, + last_valid_lsn: 0, + last_record_lsn: 0, + controldata: controlfile::ControlFileDataZenith::new(), + }), valid_lsn_condvar: Condvar::new(), walredo_sender: s, @@ -169,10 +165,8 @@ fn init_page_cache() -> PageCache last_valid_lsn: AtomicU64::new(0), last_record_lsn: AtomicU64::new(0), } - } - // // We store two kinds of entries in the page cache: // @@ -189,7 +183,7 @@ fn init_page_cache() -> PageCache #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] pub struct CacheKey { pub tag: BufferTag, - pub lsn: u64 + pub lsn: u64, } pub struct CacheEntry { @@ -202,7 +196,7 @@ pub struct CacheEntry { // // FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar // or something else. - pub walredo_condvar: Condvar + pub walredo_condvar: Condvar, } pub struct CacheEntryContent { @@ -225,7 +219,6 @@ impl CacheEntry { } } - #[derive(Eq, PartialEq, Hash, Clone, Copy, Debug)] pub struct RelTag { pub spcnode: u32, @@ -245,543 +238,542 @@ pub struct BufferTag { #[derive(Clone)] pub struct WALRecord { - pub lsn: u64, // LSN at the *end* of the record + pub lsn: u64, // LSN at the *end* of the record pub will_init: bool, - pub rec: Bytes + pub rec: Bytes, } - // Public interface functions impl PageCache { + pub fn get_nonrel_page(&self, tag: BufferTag, _reqlsn: u64) -> Result> { + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); -pub fn get_nonrel_page(&self, tag: BufferTag, _reqlsn: u64) -> Result> -{ - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + // Now we don't have versioning for non-rel pages. + // Also at bootstrap we don't know lsn for some files. + // So always request the very latest version + // let lsn = reqlsn; - // Now we don't have versioning for non-rel pages. - // Also at bootstrap we don't know lsn for some files. - // So always request the very latest version - // let lsn = reqlsn; + let lsn = u64::MAX; - let lsn = u64::MAX; + let minkey = CacheKey { tag: tag, lsn: 0 }; + // Look up to the largest lsn + let maxkey = CacheKey { tag: tag, lsn: lsn }; - let minkey = CacheKey { tag: tag, lsn: 0 }; - // Look up to the largest lsn - let maxkey = CacheKey { tag: tag, lsn: lsn }; + let entry_rc: Arc; + { + let shared = self.shared.lock().unwrap(); - let entry_rc: Arc; - { + let pagecache = &shared.pagecache; + info!("got pagecache {}", pagecache.len()); + + let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + + let entry_opt = entries.next_back(); + + if entry_opt.is_none() { + return Err(format!( + "not found non-rel page with LSN {} for {}/{}/{}.{} blk {}", + lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum + ))?; + } + + info!( + "found non-rel page with LSN {} for {}/{}/{}.{} blk {}", + lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum + ); + + let (_key, entry) = entry_opt.unwrap(); + entry_rc = entry.clone(); + + // Now that we have a reference to the cache entry, drop the lock on the map. + // It's important to do this before waiting on the condition variable below, + // and better to do it as soon as possible to maximize concurrency. + } + + // Lock the cache entry and dig the page image out of it. + let page_img: Bytes; + { + let entry_content = entry_rc.content.lock().unwrap(); + + if let Some(img) = &entry_content.page_image { + assert!(!entry_content.apply_pending); + page_img = img.clone(); + } else if entry_content.wal_record.is_some() { + return Err("non-rel WAL redo is not implemented yet".into()); + // + // If this page needs to be reconstructed by applying some WAL, + // send a request to the WAL redo thread. + // + // if !entry_content.apply_pending { + // assert!(!entry_content.apply_pending); + // entry_content.apply_pending = true; + + // let s = &self.walredo_sender; + // s.send(entry_rc.clone())?; + // } + + // while entry_content.apply_pending { + // entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); + //} + + // We should now have a page image. If we don't, it means that WAL redo + // failed to reconstruct it. WAL redo should've logged that error already. + // page_img = match &entry_content.page_image { + // Some(p) => p.clone(), + // None => { + // error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); + // return Err("could not apply WAL to reconstruct page image".into()); + // } + // }; + } else { + // No base image, and no WAL record. Huh? + return Err(format!("no page image or WAL record for requested page"))?; + } + } + + trace!( + "Returning page for {}/{}/{}.{} blk {}", + tag.spcnode, + tag.dbnode, + tag.relnode, + tag.forknum, + tag.blknum + ); + + return Ok(page_img); + } + + // + // GetPage@LSN + // + // Returns an 8k page image + // + pub fn get_page_at_lsn(&self, tag: BufferTag, reqlsn: u64) -> Result> { + let mut lsn = reqlsn; + + if tag.forknum > 40 { + info!( + "get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", + lsn, tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum + ); + + return self.get_nonrel_page(tag, lsn); + } + + if reqlsn == 0 { + let c = self.get_controldata(); + lsn = c.checkPoint; + + info!("update reqlsn get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn, + tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); + } + + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + + // Look up cache entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let minkey = CacheKey { tag: tag, lsn: 0 }; + let maxkey = CacheKey { tag: tag, lsn: lsn }; + let entry_rc: Arc; + { + let mut shared = self.shared.lock().unwrap(); + + let mut waited = false; + + // When server just started and created checkpoint lsn, + // but we have not yet established connection, + // requested lsn will be larger than the one we have + while lsn > shared.last_valid_lsn + 500 { + // TODO: Wait for the WAL receiver to catch up + waited = true; + trace!( + "not caught up yet: {}, requested {}", + shared.last_valid_lsn, + lsn + ); + let wait_result = self + .valid_lsn_condvar + .wait_timeout(shared, TIMEOUT) + .unwrap(); + + shared = wait_result.0; + if wait_result.1.timed_out() { + return Err(format!( + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn + ))?; + } + } + if waited { + trace!("caught up now, continuing"); + } + + if lsn < shared.first_valid_lsn { + return Err(format!("LSN {} has already been removed", lsn))?; + } + + let pagecache = &shared.pagecache; + + let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + + let entry_opt = entries.next_back(); + + if entry_opt.is_none() { + //static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192]; + //return Ok(Bytes::from_static(&ZERO_PAGE)); + return Err("could not find page image")?; + } + let (_key, entry) = entry_opt.unwrap(); + entry_rc = entry.clone(); + + // Now that we have a reference to the cache entry, drop the lock on the map. + // It's important to do this before waiting on the condition variable below, + // and better to do it as soon as possible to maximize concurrency. + } + + // Lock the cache entry and dig the page image out of it. + let page_img: Bytes; + { + let mut entry_content = entry_rc.content.lock().unwrap(); + + if let Some(img) = &entry_content.page_image { + assert!(!entry_content.apply_pending); + page_img = img.clone(); + } else if entry_content.wal_record.is_some() { + // + // If this page needs to be reconstructed by applying some WAL, + // send a request to the WAL redo thread. + // + if !entry_content.apply_pending { + assert!(!entry_content.apply_pending); + entry_content.apply_pending = true; + + let s = &self.walredo_sender; + s.send(entry_rc.clone())?; + } + + while entry_content.apply_pending { + entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); + } + + // We should now have a page image. If we don't, it means that WAL redo + // failed to reconstruct it. WAL redo should've logged that error already. + page_img = match &entry_content.page_image { + Some(p) => p.clone(), + None => { + error!( + "could not apply WAL to reconstruct page image for GetPage@LSN request" + ); + return Err("could not apply WAL to reconstruct page image".into()); + } + }; + } else { + // No base image, and no WAL record. Huh? + return Err(format!("no page image or WAL record for requested page"))?; + } + } + + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + trace!( + "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", + page_lsn_hi, + page_lsn_lo, + tag.spcnode, + tag.dbnode, + tag.relnode, + tag.forknum, + tag.blknum + ); + + return Ok(page_img); + } + + // + // Collect all the WAL records that are needed to reconstruct a page + // image for the given cache entry. + // + // Returns an old page image (if any), and a vector of WAL records to apply + // over it. + // + pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { + // Scan the BTreeMap backwards, starting from the given entry. let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - info!("got pagecache {}", pagecache.len()); - let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + let minkey = CacheKey { + tag: entry.key.tag, + lsn: 0, + }; + let maxkey = CacheKey { + tag: entry.key.tag, + lsn: entry.key.lsn, + }; + let entries = pagecache.range((Included(&minkey), Included(&maxkey))); - let entry_opt = entries.next_back(); + // the last entry in the range should be the CacheEntry we were given + //let _last_entry = entries.next_back(); + //assert!(last_entry == entry); - if entry_opt.is_none() { - return Err(format!("not found non-rel page with LSN {} for {}/{}/{}.{} blk {}", lsn, - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum))?; + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + for (_key, e) in entries.rev() { + let e = e.content.lock().unwrap(); + + if let Some(img) = &e.page_image { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } else if let Some(rec) = &e.wal_record { + records.push(rec.clone()); + + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + break; + } + } else { + panic!("no base image and no WAL record on cache entry"); + } } - info!("found non-rel page with LSN {} for {}/{}/{}.{} blk {}", lsn, - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); - - let (_key, entry) = entry_opt.unwrap(); - entry_rc = entry.clone(); - - // Now that we have a reference to the cache entry, drop the lock on the map. - // It's important to do this before waiting on the condition variable below, - // and better to do it as soon as possible to maximize concurrency. + records.reverse(); + return (base_img, records); } - // Lock the cache entry and dig the page image out of it. - let page_img: Bytes; - { - let entry_content = entry_rc.content.lock().unwrap(); + // + // Adds a WAL record to the page cache + // + pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { + let key = CacheKey { + tag: tag, + lsn: rec.lsn, + }; - if let Some(img) = &entry_content.page_image { - assert!(!entry_content.apply_pending); - page_img = img.clone(); - } else if entry_content.wal_record.is_some() { + let entry = CacheEntry::new(key.clone()); + entry.content.lock().unwrap().wal_record = Some(rec); - return Err("non-rel WAL redo is not implemented yet".into()); - // - // If this page needs to be reconstructed by applying some WAL, - // send a request to the WAL redo thread. - // - // if !entry_content.apply_pending { - // assert!(!entry_content.apply_pending); - // entry_content.apply_pending = true; - - // let s = &self.walredo_sender; - // s.send(entry_rc.clone())?; - // } - - // while entry_content.apply_pending { - // entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - //} - - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - // page_img = match &entry_content.page_image { - // Some(p) => p.clone(), - // None => { - // error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - // return Err("could not apply WAL to reconstruct page image".into()); - // } - // }; - - } else { - // No base image, and no WAL record. Huh? - return Err(format!("no page image or WAL record for requested page"))?; - } - } - - trace!("Returning page for {}/{}/{}.{} blk {}", - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); - - return Ok(page_img); -} - -// -// GetPage@LSN -// -// Returns an 8k page image -// -pub fn get_page_at_lsn(&self, tag: BufferTag, reqlsn: u64) -> Result> -{ - let mut lsn = reqlsn; - - if tag.forknum > 40 - { - info!("get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn, - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); - - return self.get_nonrel_page(tag, lsn); - } - - if reqlsn == 0 - { - let c = self.get_controldata(); - lsn = c.checkPoint; - - info!("update reqlsn get_page_at_lsn got request for page with LSN {} for {}/{}/{}.{} blk {}", lsn, - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); - - } - - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag: tag, lsn: 0 }; - let maxkey = CacheKey { tag: tag, lsn: lsn }; - let entry_rc: Arc; - { let mut shared = self.shared.lock().unwrap(); - let mut waited = false; - - // When server just started and created checkpoint lsn, - // but we have not yet established connection, - // requested lsn will be larger than the one we have - while lsn > shared.last_valid_lsn + 500 { - // TODO: Wait for the WAL receiver to catch up - waited = true; - trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); - let wait_result = self.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); - - shared = wait_result.0; - if wait_result.1.timed_out() { - return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?; - } - } - if waited { - trace!("caught up now, continuing"); - } - - if lsn < shared.first_valid_lsn { - return Err(format!("LSN {} has already been removed", lsn))?; + let rel_tag = RelTag { + spcnode: tag.spcnode, + dbnode: tag.dbnode, + relnode: tag.relnode, + forknum: tag.forknum, + }; + let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0); + if tag.blknum >= *rel_entry { + *rel_entry = tag.blknum + 1; } - let pagecache = &shared.pagecache; + trace!("put_wal_record lsn: {}", key.lsn); - let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + let oldentry = shared.pagecache.insert(key, Arc::new(entry)); + self.num_entries.fetch_add(1, Ordering::Relaxed); - let entry_opt = entries.next_back(); - - if entry_opt.is_none() { - - //static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192]; - //return Ok(Bytes::from_static(&ZERO_PAGE)); - return Err("could not find page image")?; + if !oldentry.is_none() { + error!("overwriting WAL record in page cache"); } - let (_key, entry) = entry_opt.unwrap(); - entry_rc = entry.clone(); - // Now that we have a reference to the cache entry, drop the lock on the map. - // It's important to do this before waiting on the condition variable below, - // and better to do it as soon as possible to maximize concurrency. + self.num_wal_records.fetch_add(1, Ordering::Relaxed); } - // Lock the cache entry and dig the page image out of it. - let page_img: Bytes; - { - let mut entry_content = entry_rc.content.lock().unwrap(); - - if let Some(img) = &entry_content.page_image { - assert!(!entry_content.apply_pending); - page_img = img.clone(); - } else if entry_content.wal_record.is_some() { - - // - // If this page needs to be reconstructed by applying some WAL, - // send a request to the WAL redo thread. - // - if !entry_content.apply_pending { - assert!(!entry_content.apply_pending); - entry_content.apply_pending = true; - - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - } - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - return Err("could not apply WAL to reconstruct page image".into()); - } - }; - - } else { - // No base image, and no WAL record. Huh? - return Err(format!("no page image or WAL record for requested page"))?; - } - } - - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - trace!("Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo, - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum); - - return Ok(page_img); -} - -// -// Collect all the WAL records that are needed to reconstruct a page -// image for the given cache entry. -// -// Returns an old page image (if any), and a vector of WAL records to apply -// over it. -// -pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) -{ - // Scan the BTreeMap backwards, starting from the given entry. - let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - - let minkey = CacheKey { - tag: entry.key.tag, - lsn: 0 - }; - let maxkey = CacheKey { - tag: entry.key.tag, - lsn: entry.key.lsn - }; - let entries = pagecache.range((Included(&minkey), Included(&maxkey))); - - // the last entry in the range should be the CacheEntry we were given - //let _last_entry = entries.next_back(); - //assert!(last_entry == entry); - - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - for (_key, e) in entries.rev() { - let e = e.content.lock().unwrap(); - - if let Some(img) = &e.page_image { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img.clone()); - break; - } else if let Some(rec) = &e.wal_record { - - records.push(rec.clone()); - - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } else { - panic!("no base image and no WAL record on cache entry"); - } - } - - records.reverse(); - return (base_img, records); -} - - -// -// Adds a WAL record to the page cache -// -pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -{ - let key = CacheKey { - tag: tag, - lsn: rec.lsn - }; - - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().wal_record = Some(rec); - - let mut shared = self.shared.lock().unwrap(); - - let rel_tag = RelTag { - spcnode: tag.spcnode, - dbnode: tag.dbnode, - relnode: tag.relnode, - forknum: tag.forknum, - }; - let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0); - if tag.blknum >= *rel_entry { - *rel_entry = tag.blknum + 1; - } - - trace!("put_wal_record lsn: {}", key.lsn); - - let oldentry = shared.pagecache.insert(key, Arc::new(entry)); - self.num_entries.fetch_add(1, Ordering::Relaxed); - - if !oldentry.is_none() { - error!("overwriting WAL record in page cache"); - } - - self.num_wal_records.fetch_add(1, Ordering::Relaxed); -} - -// -// Memorize a full image of a page version -// -pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) -{ - let key = CacheKey { - tag: tag, - lsn: lsn - }; - - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().page_image = Some(img); - - let mut shared = self.shared.lock().unwrap(); - let pagecache = &mut shared.pagecache; - - let oldentry = pagecache.insert(key, Arc::new(entry)); - self.num_entries.fetch_add(1, Ordering::Relaxed); - assert!(oldentry.is_none()); - - debug!("inserted page image for {}/{}/{}_{} blk {} at {}", - tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - - self.num_page_images.fetch_add(1, Ordering::Relaxed); -} - -// -pub fn advance_last_valid_lsn(&self, lsn: u64) -{ - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); - - shared.last_valid_lsn = lsn; - self.valid_lsn_condvar.notify_all(); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); -} - -// -// NOTE: this updates last_valid_lsn as well. -// -pub fn advance_last_record_lsn(&self, lsn: u64) -{ - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); - assert!(lsn >= shared.last_record_lsn); - - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - self.valid_lsn_condvar.notify_all(); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); -} - -// -pub fn _advance_first_valid_lsn(&self, lsn: u64) -{ - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.first_valid_lsn); - - // Can't overtake last_valid_lsn (except when we're - // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); - - shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); -} - -pub fn init_valid_lsn(&self, lsn: u64) -{ - let mut shared = self.shared.lock().unwrap(); - - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); - - shared.first_valid_lsn = lsn; - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); -} - -pub fn get_last_valid_lsn(&self) -> u64 -{ - let shared = self.shared.lock().unwrap(); - - return shared.last_record_lsn; -} - -pub fn set_controldata(&self, c: controlfile::ControlFileDataZenith) -{ - let mut shared = self.shared.lock().unwrap(); - shared.controldata = c; -} - -pub fn get_controldata(&self, ) -> controlfile::ControlFileDataZenith -{ - let shared = self.shared.lock().unwrap(); - return shared.controldata.clone(); -} - -// -// Simple test function for the WAL redo code: -// -// 1. Pick a page from the page cache at random. -// 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) -// -// -pub fn _test_get_page_at_lsn(&self) -{ - // for quick testing of the get_page_at_lsn() funcion. // - // Get a random page from the page cache. Apply all its WAL, by requesting - // that page at the highest lsn. + // Memorize a full image of a page version + // + pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + let key = CacheKey { tag: tag, lsn: lsn }; - let mut tag: Option = None; + let entry = CacheEntry::new(key.clone()); + entry.content.lock().unwrap().page_image = Some(img); - { + let mut shared = self.shared.lock().unwrap(); + let pagecache = &mut shared.pagecache; + + let oldentry = pagecache.insert(key, Arc::new(entry)); + self.num_entries.fetch_add(1, Ordering::Relaxed); + assert!(oldentry.is_none()); + + debug!( + "inserted page image for {}/{}/{}_{} blk {} at {}", + tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn + ); + + self.num_page_images.fetch_add(1, Ordering::Relaxed); + } + + // + pub fn advance_last_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + + shared.last_valid_lsn = lsn; + self.valid_lsn_condvar.notify_all(); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } + + // + // NOTE: this updates last_valid_lsn as well. + // + pub fn advance_last_record_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + assert!(lsn >= shared.last_record_lsn); + + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + self.valid_lsn_condvar.notify_all(); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } + + // + pub fn _advance_first_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.first_valid_lsn); + + // Can't overtake last_valid_lsn (except when we're + // initializing the system and last_valid_lsn hasn't been set yet. + assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + + shared.first_valid_lsn = lsn; + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn init_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + assert!(shared.first_valid_lsn == 0); + assert!(shared.last_valid_lsn == 0); + assert!(shared.last_record_lsn == 0); + + shared.first_valid_lsn = lsn; + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn get_last_valid_lsn(&self) -> u64 { let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - if pagecache.is_empty() { - info!("page cache is empty"); - return; - } + return shared.last_record_lsn; + } - // Find nth entry in the map, where n is picked at random - let n = rand::thread_rng().gen_range(0..pagecache.len()); - let mut i = 0; - for (key, _e) in pagecache.iter() { - if i == n { - tag = Some(key.tag); - break; + pub fn set_controldata(&self, c: controlfile::ControlFileDataZenith) { + let mut shared = self.shared.lock().unwrap(); + shared.controldata = c; + } + + pub fn get_controldata(&self) -> controlfile::ControlFileDataZenith { + let shared = self.shared.lock().unwrap(); + return shared.controldata.clone(); + } + + // + // Simple test function for the WAL redo code: + // + // 1. Pick a page from the page cache at random. + // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) + // + // + pub fn _test_get_page_at_lsn(&self) { + // for quick testing of the get_page_at_lsn() funcion. + // + // Get a random page from the page cache. Apply all its WAL, by requesting + // that page at the highest lsn. + + let mut tag: Option = None; + + { + let shared = self.shared.lock().unwrap(); + let pagecache = &shared.pagecache; + + if pagecache.is_empty() { + info!("page cache is empty"); + return; + } + + // Find nth entry in the map, where n is picked at random + let n = rand::thread_rng().gen_range(0..pagecache.len()); + let mut i = 0; + for (key, _e) in pagecache.iter() { + if i == n { + tag = Some(key.tag); + break; + } + i += 1; + } + } + + info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); + match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { + Ok(_img) => { + // This prints out the whole page image. + //println!("{:X?}", img); + } + Err(error) => { + error!("GetPage@LSN failed: {}", error); } - i += 1; } } - info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { - Ok(_img) => { - // This prints out the whole page image. - //println!("{:X?}", img); - }, - Err(error) => { - error!("GetPage@LSN failed: {}", error); + // FIXME: Shouldn't relation size also be tracked with an LSN? + // If a replica is lagging behind, it needs to get the size as it was on + // the replica's current replay LSN. + pub fn relsize_inc(&self, rel: &RelTag, to: Option) { + let mut shared = self.shared.lock().unwrap(); + let entry = shared.relsize_cache.entry(*rel).or_insert(0); + + if let Some(to) = to { + if to >= *entry { + *entry = to + 1; + } + } + trace!("relsize_inc {:?} to {}", rel, entry); + } + + pub fn relsize_get(&self, rel: &RelTag) -> u32 { + let mut shared = self.shared.lock().unwrap(); + let entry = shared.relsize_cache.entry(*rel).or_insert(0); + *entry + } + + pub fn relsize_exist(&self, rel: &RelTag) -> bool { + let shared = self.shared.lock().unwrap(); + let relsize_cache = &shared.relsize_cache; + relsize_cache.contains_key(rel) + } + + pub fn get_stats(&self) -> PageCacheStats { + PageCacheStats { + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), + last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), } } } - -// FIXME: Shouldn't relation size also be tracked with an LSN? -// If a replica is lagging behind, it needs to get the size as it was on -// the replica's current replay LSN. -pub fn relsize_inc(&self, rel: &RelTag, to: Option) -{ - let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); - - if let Some(to) = to { - if to >= *entry { - *entry = to + 1; - } - } - info!("relsize_inc {:?} to {}", rel, entry); -} - -pub fn relsize_get(&self, rel: &RelTag) -> u32 -{ - let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); - *entry -} - -pub fn relsize_exist(&self, rel: &RelTag) -> bool -{ - let shared = self.shared.lock().unwrap(); - let relsize_cache = &shared.relsize_cache; - relsize_cache.contains_key(rel) -} - -pub fn get_stats(&self) -> PageCacheStats -{ - PageCacheStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), - } -} - -} - -pub fn get_stats() -> PageCacheStats -{ +pub fn get_stats() -> PageCacheStats { let pcaches = PAGECACHES.lock().unwrap(); let mut stats = PageCacheStats { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f255181613..cb8221223d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -374,7 +374,6 @@ impl Connection { self.stream.write_u32(resp.n_blocks).await?; self.stream.write_buf(&mut resp.page.clone()).await?; } - } Ok(()) @@ -429,7 +428,6 @@ impl Connection { trace!("got query {:?}", q.body); if q.body.starts_with(b"file") { - let (_l, r) = q.body.split_at("file ".len()); //TODO parse it correctly let r = r.to_vec(); @@ -439,23 +437,44 @@ impl Connection { let mut s; let filepath = split.next().unwrap(); - let sysid = { s = split.next().unwrap(); s.parse::().unwrap()}; + let sysid = { + s = split.next().unwrap(); + s.parse::().unwrap() + }; let buf_tag = page_cache::BufferTag { - spcnode: { s = split.next().unwrap(); s.parse::().unwrap() }, - dbnode: { s = split.next().unwrap(); s.parse::().unwrap() }, - relnode: { s = split.next().unwrap(); s.parse::().unwrap() }, - forknum: { s = split.next().unwrap(); s.parse::().unwrap() }, - blknum: { s = split.next().unwrap(); s.parse::().unwrap() } + spcnode: { + s = split.next().unwrap(); + s.parse::().unwrap() + }, + dbnode: { + s = split.next().unwrap(); + s.parse::().unwrap() + }, + relnode: { + s = split.next().unwrap(); + s.parse::().unwrap() + }, + forknum: { + s = split.next().unwrap(); + s.parse::().unwrap() + }, + blknum: { + s = split.next().unwrap(); + s.parse::().unwrap() + }, }; //TODO PARSE LSN //let lsn = { s = split.next().unwrap(); s.parse::().unwrap()}; let lsn: u64 = 0; - info!("process file query sysid {} -- {:?} lsn {}",sysid, buf_tag, lsn); - - self.handle_file(filepath.to_string(), sysid, buf_tag, lsn.into()).await + info!( + "process file query sysid {} -- {:?} lsn {}", + sysid, buf_tag, lsn + ); + self.handle_file(filepath.to_string(), sysid, buf_tag, lsn.into()) + .await } else if q.body.starts_with(b"pagestream ") { let (_l, r) = q.body.split_at("pagestream ".len()); let mut r = r.to_vec(); @@ -502,9 +521,13 @@ impl Connection { } } - async fn handle_file(&mut self, filepath: String, sysid:u64, - buf_tag: page_cache::BufferTag, lsn:u64) -> Result<()> { - + async fn handle_file( + &mut self, + filepath: String, + sysid: u64, + buf_tag: page_cache::BufferTag, + lsn: u64, + ) -> Result<()> { let pcache = page_cache::get_pagecache(self.conf.clone(), sysid); match pcache.get_page_at_lsn(buf_tag, lsn) { @@ -512,16 +535,17 @@ impl Connection { info!("info succeeded get_page_at_lsn: {}", lsn); controlfile::write_buf_to_file(filepath, p, buf_tag.blknum); - - }, + } Err(e) => { info!("page not found and it's ok. get_page_at_lsn: {}", e); } }; - self.write_message_noflush(&BeMessage::RowDescription).await?; + self.write_message_noflush(&BeMessage::RowDescription) + .await?; self.write_message_noflush(&BeMessage::DataRow).await?; - self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message_noflush(&BeMessage::CommandComplete) + .await?; self.write_message(&BeMessage::ReadyForQuery).await } @@ -588,7 +612,7 @@ impl Connection { let n_blocks = pcache.relsize_get(&tag); - info!("ZenithNblocksRequest {:?} = {}", tag, n_blocks); + trace!("ZenithNblocksRequest {:?} = {}", tag, n_blocks); self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, n_blocks: n_blocks, @@ -608,26 +632,23 @@ impl Connection { Ok(p) => { let mut b = BytesMut::with_capacity(8192); - info!("ZenithReadResponse get_page_at_lsn succeed"); - if p.len() < 8192 - { + trace!("ZenithReadResponse get_page_at_lsn succeed"); + if p.len() < 8192 { //add padding - info!("ZenithReadResponse add padding"); + trace!("ZenithReadResponse add padding"); let padding: [u8; 8192 - 512] = [0; 8192 - 512]; b.extend_from_slice(&p); b.extend_from_slice(&padding); - } - else - { + } else { b.extend_from_slice(&p); } BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0, - page: b.freeze() + page: b.freeze(), }) - }, + } Err(e) => { const ZERO_PAGE: [u8; 8192] = [0; 8192]; error!("get_page_at_lsn: {}", e); @@ -648,7 +669,7 @@ impl Connection { relnode: req.relnode, forknum: req.forknum, }; - info!("ZenithCreateRequest {:?}", tag); + trace!("ZenithCreateRequest {:?}", tag); pcache.relsize_inc(&tag, None); @@ -666,7 +687,7 @@ impl Connection { forknum: req.forknum, }; - info!("ZenithExtendRequest {:?} to {}", tag, req.blkno); + trace!("ZenithExtendRequest {:?} to {}", tag, req.blkno); pcache.relsize_inc(&tag, Some(req.blkno)); diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs index 1e641f2448..b59ddb5396 100644 --- a/pageserver/src/pg_constants.rs +++ b/pageserver/src/pg_constants.rs @@ -1,13 +1,11 @@ - // From pg_tablespace_d.h // -// FIXME: we'll probably need these elsewhere too, move to some common location -pub const DEFAULTTABLESPACE_OID:u32 = 1663; -pub const GLOBALTABLESPACE_OID:u32 = 1664; +pub const DEFAULTTABLESPACE_OID: u32 = 1663; +pub const GLOBALTABLESPACE_OID: u32 = 1664; //Special values for non-rel files' tags //TODO maybe use enum? -pub const PG_CONTROLFILE_FORKNUM:u32 = 42; -pub const PG_FILENODEMAP_FORKNUM:u32 = 43; -pub const PG_XACT_FORKNUM:u32 = 44; -pub const PG_MXACT_OFFSETS_FORKNUM:u32 = 45; -pub const PG_MXACT_MEMBERS_FORKNUM:u32 = 46; \ No newline at end of file +pub const PG_CONTROLFILE_FORKNUM: u32 = 42; +pub const PG_FILENODEMAP_FORKNUM: u32 = 43; +pub const PG_XACT_FORKNUM: u32 = 44; +pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45; +pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46; diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index 773602de09..61091eb94b 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -22,7 +22,7 @@ use tokio::runtime; use futures::future; -use crate::{PageServerConf, page_cache, pg_constants, controlfile}; +use crate::{controlfile, page_cache, pg_constants, PageServerConf}; struct Storage { region: Region, @@ -86,7 +86,12 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { //Before uploading other files, slurp pg_control to set systemid - let control_results: Vec = bucket.list("relationdata/global/pg_control".to_string(), Some("".to_string())).await?; + let control_results: Vec = bucket + .list( + "relationdata/global/pg_control".to_string(), + Some("".to_string()), + ) + .await?; let object = &(&control_results[0]).contents[0]; let (data, _) = bucket.get_object(&object.key).await.unwrap(); let bytes = BytesMut::from(data.as_slice()).freeze(); @@ -131,10 +136,11 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { } //Now add nonrelation files - let nonrelresults: Vec = bucket.list("nonreldata/".to_string(), Some("".to_string())).await?; + let nonrelresults: Vec = bucket + .list("nonreldata/".to_string(), Some("".to_string())) + .await?; for result in nonrelresults { for object in result.contents { - // Download needed non relation files, slurping them into memory let key = object.key; @@ -150,7 +156,9 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { slurp_futures.push(f); } - Err(e) => { warn!("unrecognized file: {} ({})", relpath, e); } + Err(e) => { + warn!("unrecognized file: {} ({})", relpath, e); + } }; } } @@ -160,12 +168,14 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { info!("{} files to restore...", slurp_futures.len()); future::join_all(slurp_futures).await; - info!("restored! {:?} to {:?}", pcache.first_valid_lsn, pcache.last_valid_lsn); + info!( + "restored! {:?} to {:?}", + pcache.first_valid_lsn, pcache.last_valid_lsn + ); Ok(()) } - #[derive(Debug)] struct FilePathError { msg: String, @@ -215,10 +225,8 @@ struct ParsedBaseImageFileName { pub lsn: u64, } -fn parse_lsn_from_filename(fname: &str) -> Result -{ - - let (_, lsn_str) = fname.split_at(fname.len()-16); +fn parse_lsn_from_filename(fname: &str) -> Result { + let (_, lsn_str) = fname.split_at(fname.len() - 16); let (lsnhi, lsnlo) = lsn_str.split_at(8); let lsn_hi = u64::from_str_radix(lsnhi, 16)?; @@ -267,10 +275,8 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { } fn parse_nonrel_file_path(path: &str) -> Result { - //TODO parse segno from xact filenames too if let Some(fname) = path.strip_prefix("pg_xact/") { - let lsn = parse_lsn_from_filename(fname.clone())?; return Ok(ParsedBaseImageFileName { @@ -279,11 +285,9 @@ fn parse_nonrel_file_path(path: &str) -> Result Result Result Result { @@ -334,9 +333,7 @@ fn parse_rel_file_path(path: &str) -> Result. */ if let Some(fname) = path.strip_prefix("global/") { - - if fname.contains("pg_control") - { + if fname.contains("pg_control") { let lsn = parse_lsn_from_filename(fname.clone())?; return Ok(ParsedBaseImageFileName { @@ -345,12 +342,11 @@ fn parse_rel_file_path(path: &str) -> Result Result Result Result= 512 { - let tag = page_cache::BufferTag { spcnode: parsed.spcnode, dbnode: parsed.dbnode, relnode: parsed.relnode, forknum: parsed.forknum as u8, - blknum: 0 + blknum: 0, }; pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(512)); @@ -466,11 +458,9 @@ async fn slurp_base_file( }; pcache.relsize_inc(&tag, Some(0)); - } - else - { + } else { // FIXME: use constants (BLCKSZ) - let mut blknum: u32 = parsed.segno * (1024*1024*1024 / 8192); + let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); let reltag = page_cache::RelTag { spcnode: parsed.spcnode, dbnode: parsed.dbnode, @@ -479,13 +469,12 @@ async fn slurp_base_file( }; while bytes.remaining() >= 8192 { - let tag = page_cache::BufferTag { spcnode: parsed.spcnode, dbnode: parsed.dbnode, relnode: parsed.relnode, forknum: parsed.forknum as u8, - blknum: blknum + blknum: blknum, }; pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 6afad5bed6..8733d0975f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -238,7 +238,7 @@ const BLCKSZ: u16 = 8192; // const XLR_INFO_MASK: u8 = 0x0F; -const XLR_MAX_BLOCK_ID:u8 = 32; +const XLR_MAX_BLOCK_ID: u8 = 32; const XLR_BLOCK_ID_DATA_SHORT: u8 = 255; const XLR_BLOCK_ID_DATA_LONG: u8 = 254; @@ -260,8 +260,8 @@ const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay // // constants from clog.h // -const CLOG_XACTS_PER_BYTE:u32 = 4; -const CLOG_XACTS_PER_PAGE:u32 = 8192 * CLOG_XACTS_PER_BYTE; +const CLOG_XACTS_PER_BYTE: u32 = 4; +const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE; pub struct DecodedBkpBlock { /* Is this block ref in use? */ @@ -307,7 +307,7 @@ pub struct DecodedWALRecord { const XLOG_SWITCH: u8 = 0x40; const RM_XLOG_ID: u8 = 0; -const RM_XACT_ID:u8 = 1; +const RM_XACT_ID: u8 = 1; // const RM_CLOG_ID:u8 = 3; //const RM_MULTIXACT_ID:u8 = 6; @@ -327,7 +327,6 @@ const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ // const XLOG_XACT_HAS_INFO: u8 = 0x80; - // Is this record an XLOG_SWITCH record? They need some special processing, // so we need to check for that before the rest of the parsing. // @@ -369,9 +368,9 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); - info!("decode_wal_record xl_rmid = {}" , xl_rmid); + info!("decode_wal_record xl_rmid = {}", xl_rmid); - let rminfo: u8 = xl_info & !XLR_INFO_MASK; + let rminfo: u8 = xl_info & !XLR_INFO_MASK; let remaining = xl_tot_len - SizeOfXLogRecord; @@ -384,15 +383,15 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { let mut rnode_relnode: u32 = 0; let mut got_rnode = false; - if xl_rmid == RM_XACT_ID && - ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT || - (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED) + if xl_rmid == RM_XACT_ID + && ((rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT + || (rminfo & XLOG_XACT_OPMASK) == XLOG_XACT_COMMIT_PREPARED) { info!("decode_wal_record RM_XACT_ID - XLOG_XACT_COMMIT"); let mut blocks: Vec = Vec::new(); - let blkno = xl_xid/CLOG_XACTS_PER_PAGE; + let blkno = xl_xid / CLOG_XACTS_PER_PAGE; let mut blk = DecodedBkpBlock { rnode_spcnode: 0, @@ -411,21 +410,24 @@ pub fn decode_wal_record(lsn: u64, rec: Bytes) -> DecodedWALRecord { bimg_info: 0, has_data: true, - data_len: 0 + data_len: 0, }; let fork_flags = buf.get_u8(); blk.has_data = (fork_flags & BKPBLOCK_HAS_DATA) != 0; blk.data_len = buf.get_u16_le(); - info!("decode_wal_record RM_XACT_ID blk has data with data_len {}", blk.data_len); + info!( + "decode_wal_record RM_XACT_ID blk has data with data_len {}", + blk.data_len + ); blocks.push(blk); return DecodedWALRecord { lsn: lsn, record: rec, - blocks: blocks - } + blocks: blocks, + }; } // Decode the headers diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 2eb1ea3a57..685e771f4a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -160,9 +160,11 @@ impl WalRedoProcess { .expect("failed to execute initdb"); if !initdb.status.success() { - panic!("initdb failed: {}\nstderr:\n{}", - std::str::from_utf8(&initdb.stdout).unwrap(), - std::str::from_utf8(&initdb.stderr).unwrap()); + panic!( + "initdb failed: {}\nstderr:\n{}", + std::str::from_utf8(&initdb.stdout).unwrap(), + std::str::from_utf8(&initdb.stderr).unwrap() + ); } // Start postgres itself diff --git a/vendor/postgres b/vendor/postgres index 0a9e53f73d..a71b5c24eb 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 0a9e53f73df02518f43edd48cc79ea9abaa1a5bb +Subproject commit a71b5c24eb1d5112f6406a9349bd26c379decd42