mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 13:02:55 +00:00
Do not update relation metadata when materializing page in get_page_at_lsn
This commit is contained in:
@@ -132,7 +132,9 @@ impl<'a> Basebackup<'a> {
|
||||
tag: &ObjectTag,
|
||||
page: u32,
|
||||
) -> anyhow::Result<()> {
|
||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
let img = self
|
||||
.timeline
|
||||
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
// Zero length image indicates truncated segment: just skip it
|
||||
if !img.is_empty() {
|
||||
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
||||
@@ -172,7 +174,9 @@ impl<'a> Basebackup<'a> {
|
||||
// Extract pg_filenode.map files from repository
|
||||
//
|
||||
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
|
||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
let img = self
|
||||
.timeline
|
||||
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
info!("add_relmap_file {:?}", db);
|
||||
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||
String::from("global/pg_filenode.map") // filenode map for global tablespace
|
||||
@@ -198,7 +202,9 @@ impl<'a> Basebackup<'a> {
|
||||
if self.timeline.get_tx_status(xid, self.lsn)?
|
||||
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
|
||||
{
|
||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
let img = self
|
||||
.timeline
|
||||
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(&img[..]);
|
||||
let crc = crc32c::crc32c(&img[..]);
|
||||
@@ -214,12 +220,12 @@ impl<'a> Basebackup<'a> {
|
||||
// Add generated pg_control file
|
||||
//
|
||||
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
|
||||
let pg_control_bytes = self
|
||||
.timeline
|
||||
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
|
||||
let checkpoint_bytes =
|
||||
self.timeline
|
||||
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
|
||||
let pg_control_bytes =
|
||||
self.timeline
|
||||
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
|
||||
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
|
||||
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||
|
||||
const DEFAULT_WAL_REDOERS: usize = 2;
|
||||
const DEFAULT_WAL_REDOERS: usize = 1;
|
||||
|
||||
/// String arguments that can be declared via CLI or config file
|
||||
#[derive(Serialize, Deserialize)]
|
||||
@@ -36,7 +36,7 @@ struct CfgFileParams {
|
||||
listen_addr: Option<String>,
|
||||
gc_horizon: Option<String>,
|
||||
gc_period: Option<String>,
|
||||
wal_redoers: Option<String>,
|
||||
wal_redoers: Option<String>,
|
||||
pg_distrib_dir: Option<String>,
|
||||
}
|
||||
|
||||
@@ -106,7 +106,7 @@ impl CfgFileParams {
|
||||
listen_addr,
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
wal_redoers,
|
||||
wal_redoers,
|
||||
workdir: PathBuf::from("."),
|
||||
|
||||
pg_distrib_dir,
|
||||
|
||||
@@ -101,6 +101,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
// rapid init+start case now, but the general race condition remains if you restart the
|
||||
// server quickly.
|
||||
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
|
||||
//let storage = crate::inmem_storage::InmemObjectStore::create(conf)?;
|
||||
|
||||
let repo = crate::object_repository::ObjectRepository::new(
|
||||
conf,
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::time::Duration;
|
||||
|
||||
pub mod basebackup;
|
||||
pub mod branches;
|
||||
pub mod inmem_storage;
|
||||
pub mod object_key;
|
||||
pub mod object_repository;
|
||||
pub mod object_store;
|
||||
@@ -105,7 +106,7 @@ impl PageServerConf {
|
||||
/// is separate from PostgreSQL timelines, and doesn't have those
|
||||
/// limitations. A zenith timeline is identified by a 128-bit ID, which
|
||||
/// is usually printed out as a hex string.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
pub struct ZTimelineId([u8; 16]);
|
||||
|
||||
impl FromStr for ZTimelineId {
|
||||
|
||||
@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
|
||||
/// repository. It is shared between object_repository.rs and object_store.rs.
|
||||
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects
|
||||
/// using the key given by the caller.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct ObjectKey {
|
||||
pub timeline: ZTimelineId,
|
||||
pub tag: ObjectTag,
|
||||
|
||||
@@ -71,7 +71,7 @@ impl Repository for ObjectRepository {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let timeline = ObjectTimeline::open(
|
||||
self.conf,
|
||||
self.conf,
|
||||
Arc::clone(&self.obj_store),
|
||||
timelineid,
|
||||
self.walredo_mgr.clone(),
|
||||
@@ -125,7 +125,7 @@ impl Repository for ObjectRepository {
|
||||
info!("Created empty timeline {}", timelineid);
|
||||
|
||||
let timeline = ObjectTimeline::open(
|
||||
self.conf,
|
||||
self.conf,
|
||||
Arc::clone(&self.obj_store),
|
||||
timelineid,
|
||||
self.walredo_mgr.clone(),
|
||||
@@ -234,7 +234,7 @@ impl ObjectTimeline {
|
||||
///
|
||||
/// Loads the metadata for the timeline into memory.
|
||||
fn open(
|
||||
conf: &'static PageServerConf,
|
||||
conf: &'static PageServerConf,
|
||||
obj_store: Arc<dyn ObjectStore>,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
@@ -248,7 +248,7 @@ impl ObjectTimeline {
|
||||
let timeline = ObjectTimeline {
|
||||
timelineid,
|
||||
obj_store,
|
||||
conf,
|
||||
conf,
|
||||
walredo_mgr,
|
||||
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
|
||||
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
|
||||
@@ -273,7 +273,12 @@ impl Timeline for ObjectTimeline {
|
||||
self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize)
|
||||
}
|
||||
|
||||
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn, materialize: bool) -> Result<Bytes> {
|
||||
fn get_page_at_lsn_nowait(
|
||||
&self,
|
||||
tag: ObjectTag,
|
||||
req_lsn: Lsn,
|
||||
materialize: bool,
|
||||
) -> Result<Bytes> {
|
||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
@@ -295,13 +300,13 @@ impl Timeline for ObjectTimeline {
|
||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
|
||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||
|
||||
if materialize {
|
||||
// Garbage collection assumes that we remember the materialized page
|
||||
// version. Otherwise we could opt to not do it, with the downside that
|
||||
// the next GetPage@LSN call of the same page version would have to
|
||||
// redo the WAL again.
|
||||
self.put_page_image(tag, lsn, page_img.clone(), false)?;
|
||||
}
|
||||
if materialize {
|
||||
// Garbage collection assumes that we remember the materialized page
|
||||
// version. Otherwise we could opt to not do it, with the downside that
|
||||
// the next GetPage@LSN call of the same page version would have to
|
||||
// redo the WAL again.
|
||||
self.put_page_image(tag, lsn, page_img.clone(), false)?;
|
||||
}
|
||||
}
|
||||
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
|
||||
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use crate::object_repository::ObjectRepository;
|
||||
use crate::repository::Repository;
|
||||
use crate::rocksdb_storage::RocksObjectStore;
|
||||
//use crate::inmem_storage::InmemObjectStore;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use lazy_static::lazy_static;
|
||||
@@ -19,6 +20,7 @@ pub fn init(conf: &'static PageServerConf) {
|
||||
let mut m = REPOSITORY.lock().unwrap();
|
||||
|
||||
let obj_store = RocksObjectStore::open(conf).unwrap();
|
||||
//let obj_store = InmemObjectStore::open(conf).unwrap();
|
||||
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf);
|
||||
|
||||
@@ -298,7 +298,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
|
||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = startpoint;
|
||||
|
||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||
let checkpoint_bytes =
|
||||
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
|
||||
loop {
|
||||
|
||||
@@ -169,7 +169,8 @@ fn walreceiver_main(
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||
let checkpoint_bytes =
|
||||
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||
|
||||
|
||||
@@ -26,8 +26,8 @@ use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
@@ -113,7 +113,7 @@ struct WalRedoRequestData {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WalRedoRequest {
|
||||
data: WalRedoRequestData,
|
||||
data: WalRedoRequestData,
|
||||
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
|
||||
}
|
||||
|
||||
@@ -180,11 +180,11 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
|
||||
let request = WalRedoRequest {
|
||||
data: WalRedoRequestData {
|
||||
tag,
|
||||
lsn,
|
||||
base_img,
|
||||
records,
|
||||
},
|
||||
tag,
|
||||
lsn,
|
||||
base_img,
|
||||
records,
|
||||
},
|
||||
response_channel: tx,
|
||||
};
|
||||
|
||||
@@ -239,60 +239,67 @@ impl PostgresRedoManagerInternal {
|
||||
|
||||
info!("launching WAL redo postgres process");
|
||||
|
||||
let wal_redoers = self.conf.wal_redoers;
|
||||
processes = (0..wal_redoers).map(|i|runtime.block_on(PostgresRedoProcess::launch(self.conf, i)).unwrap()).collect();
|
||||
let wal_redoers = self.conf.wal_redoers;
|
||||
processes = (0..wal_redoers)
|
||||
.map(|i| {
|
||||
runtime
|
||||
.block_on(PostgresRedoProcess::launch(self.conf, i))
|
||||
.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Loop forever, handling requests as they come.
|
||||
loop {
|
||||
let mut requests: Vec<WalRedoRequest> = Vec::new();
|
||||
requests.push(self
|
||||
.request_rx
|
||||
.recv()
|
||||
.expect("WAL redo request channel was closed"));
|
||||
loop {
|
||||
let req = self.request_rx.try_recv();
|
||||
match req {
|
||||
Ok(req) => requests.push(req),
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let request_data = requests.iter().map(|req| &req.data);
|
||||
let mut rr = 0; // round robin
|
||||
let results = runtime.block_on(async {
|
||||
let futures = request_data.map(|req| {
|
||||
rr += 1;
|
||||
self.handle_apply_request(&processes[rr % wal_redoers], &req)
|
||||
});
|
||||
let mut results : Vec<Result<Bytes, WalRedoError>> = Vec::new();
|
||||
for future in futures {
|
||||
results.push(future.await);
|
||||
}
|
||||
results
|
||||
});
|
||||
for (result, request) in results.into_iter().zip(requests.iter()) {
|
||||
let result_ok = result.is_ok();
|
||||
requests.push(
|
||||
self.request_rx
|
||||
.recv()
|
||||
.expect("WAL redo request channel was closed"),
|
||||
);
|
||||
loop {
|
||||
let req = self.request_rx.try_recv();
|
||||
match req {
|
||||
Ok(req) => requests.push(req),
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let request_data = requests.iter().map(|req| &req.data);
|
||||
let mut rr = 0; // round robin
|
||||
let results = runtime.block_on(async {
|
||||
let futures = request_data.map(|req| {
|
||||
rr += 1;
|
||||
self.handle_apply_request(&processes[rr % wal_redoers], &req)
|
||||
});
|
||||
let mut results: Vec<Result<Bytes, WalRedoError>> = Vec::new();
|
||||
for future in futures {
|
||||
results.push(future.await);
|
||||
}
|
||||
results
|
||||
});
|
||||
for (result, request) in results.into_iter().zip(requests.iter()) {
|
||||
let result_ok = result.is_ok();
|
||||
|
||||
// Send the result to the requester
|
||||
let _ = request.response_channel.send(result);
|
||||
// Send the result to the requester
|
||||
let _ = request.response_channel.send(result);
|
||||
|
||||
if !result_ok {
|
||||
error!("wal-redo-postgres failed to apply request {:?}", request);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !result_ok {
|
||||
error!("wal-redo-postgres failed to apply request {:?}", request);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_apply_request(
|
||||
&self,
|
||||
process: &PostgresRedoProcess,
|
||||
request: &WalRedoRequestData,
|
||||
request: &WalRedoRequestData,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let tag = request.tag;
|
||||
let lsn = request.lsn;
|
||||
let base_img = request.base_img.clone();
|
||||
let records = &request.records;
|
||||
|
||||
let nrecords = records.len();
|
||||
let nrecords = records.len();
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -481,7 +488,7 @@ impl PostgresRedoProcess {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = conf.workdir.join(format!("wal-redo-datadir-{}",id));
|
||||
let datadir = conf.workdir.join(format!("wal-redo-datadir-{}", id));
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
|
||||
Reference in New Issue
Block a user