diff --git a/logfile b/logfile new file mode 100644 index 0000000000..304734e95b --- /dev/null +++ b/logfile @@ -0,0 +1,7 @@ +2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit +2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331 +2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331" +2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST +2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections +2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request +2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5860e13534..ed41641277 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -186,8 +186,15 @@ impl Tenant { bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.") } - let new_metadata = - TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn, pg_version,); + let new_metadata = TimelineMetadata::new( + Lsn(0), + None, + None, + Lsn(0), + initdb_lsn, + initdb_lsn, + pg_version, + ); let new_timeline = self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?; new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); @@ -207,6 +214,7 @@ impl Tenant { new_timeline_id: Option, ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, + pg_version: u32, ) -> Result>> { let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); @@ -249,7 +257,7 @@ impl Tenant { self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? } - None => self.bootstrap_timeline(new_timeline_id)?, + None => self.bootstrap_timeline(new_timeline_id, pg_version)?, }; // Have added new timeline into the tenant, now its background tasks are needed. @@ -1001,7 +1009,11 @@ impl Tenant { /// - run initdb to init temporary instance and get bootstrap data /// - after initialization complete, remove the temp dir. - fn bootstrap_timeline(&self, timeline_id: TimelineId) -> Result> { + fn bootstrap_timeline( + &self, + timeline_id: TimelineId, + pg_version: u32, + ) -> Result> { // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. let initdb_path = path_with_suffix_extension( @@ -1012,7 +1024,7 @@ impl Tenant { ); // Init temporarily repo to get bootstrap data - run_initdb(self.conf, &initdb_path)?; + run_initdb(self.conf, &initdb_path, pg_version)?; let pgdata_path = initdb_path; let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); @@ -1021,7 +1033,7 @@ impl Tenant { // LSN, and any WAL after that. // Initdb lsn will be equal to last_record_lsn which will be set after import. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. - let timeline = self.create_empty_timeline(timeline_id, lsn)?; + let timeline = self.create_empty_timeline(timeline_id, lsn, pg_version)?; import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; fail::fail_point!("before-checkpoint-new-timeline", |_| { @@ -1094,10 +1106,10 @@ impl Tenant { /// Create the cluster temporarily in 'initdbpath' directory inside the repository /// to get bootstrap data for timeline initialization. -fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { +fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32) -> Result<()> { info!("running initdb in {}... ", initdbpath.display()); - let initdb_path = conf.pg_bin_dir().join("initdb"); + let initdb_path = conf.pg_bin_dir(pg_version).join("initdb"); let initdb_output = Command::new(initdb_path) .args(&["-D", &initdbpath.to_string_lossy()]) .args(&["-U", &conf.superuser]) @@ -1107,8 +1119,8 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { // so no need to fsync it .arg("--no-sync") .env_clear() - .env("LD_LIBRARY_PATH", conf.pg_lib_dir()) - .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) + .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) + .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) .stdout(Stdio::null()) .output() .context("failed to execute initdb")?; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index eec24faf2f..0242a8a4a8 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -830,6 +830,10 @@ where self.epoch_start_lsn = msg.h.epoch_start_lsn; self.inmem.proposer_uuid = msg.h.proposer_uuid; + // bootstrap the decoder, if not yet + self.wal_store + .init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?; + // do the job if !msg.wal_data.is_empty() { self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; @@ -986,6 +990,10 @@ mod tests { } impl wal_storage::Storage for DummyWalStore { + fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> { + Ok(()) + } + fn flush_lsn(&self) -> Lsn { self.lsn } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index ec29e13931..c16fc9f40c 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -24,12 +24,12 @@ use utils::{ pq_proto::ReplicationFeedback, }; -use crate::control_file; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, ServerInfo, }; use crate::send_wal::HotStandbyFeedback; +use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::FullTimelineInfo; use crate::wal_storage; @@ -103,6 +103,10 @@ impl SharedState { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } + if state.server.pg_version == UNKNOWN_SERVER_VERSION { + bail!(TimelineError::UninitialinzedPgVersion(*ttid)); + } + // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; @@ -270,6 +274,8 @@ pub enum TimelineError { AlreadyExists(TenantTimelineId), #[error("Timeline {0} is not initialized, wal_seg_size is zero")] UninitializedWalSegSize(TenantTimelineId), + #[error("Timeline {0} is not initialized, pg_version is unknown")] + UninitialinzedPgVersion(TenantTimelineId), } /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline. diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 9e198fc148..934d5efb4b 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -13,9 +13,7 @@ use std::io::{self, Seek, SeekFrom}; use std::pin::Pin; use tokio::io::AsyncRead; -use postgres_ffi::v14::xlog_utils::{ - IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, -}; +use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName}; use postgres_ffi::{XLogSegNo, PG_TLI}; use std::cmp::{max, min}; @@ -29,7 +27,6 @@ use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::metrics::{time_io_closure, WalStorageMetrics}; use crate::safekeeper::SafeKeeperState; -use crate::safekeeper::UNKNOWN_SERVER_VERSION; use crate::wal_backup::read_object; use crate::SafeKeeperConf; @@ -41,6 +38,8 @@ use postgres_ffi::waldecoder::WalStreamDecoder; use tokio::io::{AsyncReadExt, AsyncSeekExt}; pub trait Storage { + // Bootstrap the wal decoder with correct pg_version + fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>; /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; @@ -90,7 +89,8 @@ pub struct PhysicalStorage { flush_record_lsn: Lsn, /// Decoder is required for detecting boundaries of WAL records. - decoder: WalStreamDecoder, + /// None until it is initialized + decoder: Option, /// Cached open file for the last segment. /// @@ -117,7 +117,9 @@ impl PhysicalStorage { let write_lsn = if state.commit_lsn == Lsn(0) { Lsn(0) } else { - find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)? + // FIXME What would be the correct value here, if we can not + // call find_end_of_wal yet, because we don't know pg_version? + state.commit_lsn }; // TODO: do we really know that write_lsn is fully flushed to disk? @@ -140,7 +142,7 @@ impl PhysicalStorage { write_lsn, write_record_lsn: write_lsn, flush_record_lsn: flush_lsn, - decoder: WalStreamDecoder::new(write_lsn, UNKNOWN_SERVER_VERSION), + decoder: None, file: None, }) } @@ -255,6 +257,42 @@ impl PhysicalStorage { } impl Storage for PhysicalStorage { + fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> { + if self.decoder.is_some() { + return Ok(()); + } + + info!( + "init_decoder for pg_version {} and commit_lsn {}", + pg_majorversion, commit_lsn + ); + + let write_lsn = match pg_majorversion { + 14 => postgres_ffi::v14::xlog_utils::find_end_of_wal( + &self.timeline_dir, + self.wal_seg_size, + commit_lsn, + )?, + 15 => postgres_ffi::v15::xlog_utils::find_end_of_wal( + &self.timeline_dir, + self.wal_seg_size, + commit_lsn, + )?, + _ => bail!("unsupported postgres version"), + }; + + info!( + "init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}", + pg_majorversion, commit_lsn, write_lsn + ); + + self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion)); + self.flush_record_lsn = write_lsn; + self.write_record_lsn = write_lsn; + + Ok(()) + } + /// flush_lsn returns LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn { self.flush_record_lsn @@ -286,18 +324,18 @@ impl Storage for PhysicalStorage { // figure out last record's end lsn for reporting (if we got the // whole record) - if self.decoder.available() != startpos { + if self.decoder.as_ref().unwrap().available() != startpos { info!( "restart decoder from {} to {}", - self.decoder.available(), + self.decoder.as_ref().unwrap().available(), startpos, ); - let pg_version = self.decoder.pg_version; - self.decoder = WalStreamDecoder::new(startpos, pg_version); + let pg_version = self.decoder.as_ref().unwrap().pg_version; + self.decoder = Some(WalStreamDecoder::new(startpos, pg_version)); } - self.decoder.feed_bytes(buf); + self.decoder.as_mut().unwrap().feed_bytes(buf); loop { - match self.decoder.poll_decode()? { + match self.decoder.as_mut().unwrap().poll_decode()? { None => break, // no full record yet Some((lsn, _rec)) => { self.write_record_lsn = lsn;