Fix rebase conflicts

This commit is contained in:
Anastasia Lubennikova
2022-09-22 11:02:26 +03:00
parent 4cff9cce67
commit 4139270f9c
5 changed files with 95 additions and 24 deletions

7
logfile Normal file
View File

@@ -0,0 +1,7 @@
2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit
2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331
2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331"
2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST
2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections
2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request
2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down

View File

@@ -186,8 +186,15 @@ impl Tenant {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
}
let new_metadata =
TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn, pg_version,);
let new_metadata = TimelineMetadata::new(
Lsn(0),
None,
None,
Lsn(0),
initdb_lsn,
initdb_lsn,
pg_version,
);
let new_timeline =
self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?;
new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -207,6 +214,7 @@ impl Tenant {
new_timeline_id: Option<TimelineId>,
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
) -> Result<Option<Arc<Timeline>>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate);
@@ -249,7 +257,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => self.bootstrap_timeline(new_timeline_id)?,
None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
@@ -1001,7 +1009,11 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
fn bootstrap_timeline(&self, timeline_id: TimelineId) -> Result<Arc<Timeline>> {
fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
@@ -1012,7 +1024,7 @@ impl Tenant {
);
// Init temporarily repo to get bootstrap data
run_initdb(self.conf, &initdb_path)?;
run_initdb(self.conf, &initdb_path, pg_version)?;
let pgdata_path = initdb_path;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
@@ -1021,7 +1033,7 @@ impl Tenant {
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = self.create_empty_timeline(timeline_id, lsn)?;
let timeline = self.create_empty_timeline(timeline_id, lsn, pg_version)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -1094,10 +1106,10 @@ impl Tenant {
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32) -> Result<()> {
info!("running initdb in {}... ", initdbpath.display());
let initdb_path = conf.pg_bin_dir().join("initdb");
let initdb_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_output = Command::new(initdb_path)
.args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser])
@@ -1107,8 +1119,8 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
// so no need to fsync it
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.stdout(Stdio::null())
.output()
.context("failed to execute initdb")?;

View File

@@ -830,6 +830,10 @@ where
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// bootstrap the decoder, if not yet
self.wal_store
.init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
@@ -986,6 +990,10 @@ mod tests {
}
impl wal_storage::Storage for DummyWalStore {
fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> {
Ok(())
}
fn flush_lsn(&self) -> Lsn {
self.lsn
}

View File

@@ -24,12 +24,12 @@ use utils::{
pq_proto::ReplicationFeedback,
};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage;
@@ -103,6 +103,10 @@ impl SharedState {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
@@ -270,6 +274,8 @@ pub enum TimelineError {
AlreadyExists(TenantTimelineId),
#[error("Timeline {0} is not initialized, wal_seg_size is zero")]
UninitializedWalSegSize(TenantTimelineId),
#[error("Timeline {0} is not initialized, pg_version is unknown")]
UninitialinzedPgVersion(TenantTimelineId),
}
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.

View File

@@ -13,9 +13,7 @@ use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
use postgres_ffi::v14::xlog_utils::{
IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
};
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min};
@@ -29,7 +27,6 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{time_io_closure, WalStorageMetrics};
use crate::safekeeper::SafeKeeperState;
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
@@ -41,6 +38,8 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
// Bootstrap the wal decoder with correct pg_version
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
@@ -90,7 +89,8 @@ pub struct PhysicalStorage {
flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder,
/// None until it is initialized
decoder: Option<WalStreamDecoder>,
/// Cached open file for the last segment.
///
@@ -117,7 +117,9 @@ impl PhysicalStorage {
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)?
// FIXME What would be the correct value here, if we can not
// call find_end_of_wal yet, because we don't know pg_version?
state.commit_lsn
};
// TODO: do we really know that write_lsn is fully flushed to disk?
@@ -140,7 +142,7 @@ impl PhysicalStorage {
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, UNKNOWN_SERVER_VERSION),
decoder: None,
file: None,
})
}
@@ -255,6 +257,42 @@ impl PhysicalStorage {
}
impl Storage for PhysicalStorage {
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> {
if self.decoder.is_some() {
return Ok(());
}
info!(
"init_decoder for pg_version {} and commit_lsn {}",
pg_majorversion, commit_lsn
);
let write_lsn = match pg_majorversion {
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
_ => bail!("unsupported postgres version"),
};
info!(
"init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}",
pg_majorversion, commit_lsn, write_lsn
);
self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion));
self.flush_record_lsn = write_lsn;
self.write_record_lsn = write_lsn;
Ok(())
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
@@ -286,18 +324,18 @@ impl Storage for PhysicalStorage {
// figure out last record's end lsn for reporting (if we got the
// whole record)
if self.decoder.available() != startpos {
if self.decoder.as_ref().unwrap().available() != startpos {
info!(
"restart decoder from {} to {}",
self.decoder.available(),
self.decoder.as_ref().unwrap().available(),
startpos,
);
let pg_version = self.decoder.pg_version;
self.decoder = WalStreamDecoder::new(startpos, pg_version);
let pg_version = self.decoder.as_ref().unwrap().pg_version;
self.decoder = Some(WalStreamDecoder::new(startpos, pg_version));
}
self.decoder.feed_bytes(buf);
self.decoder.as_mut().unwrap().feed_bytes(buf);
loop {
match self.decoder.poll_decode()? {
match self.decoder.as_mut().unwrap().poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;