mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Merge branch 'hackathon/single_click_pg_upgrade' of https://github.com/neondatabase/neon into hackathon/single_click_pg_upgrade
This commit is contained in:
@@ -5078,14 +5078,14 @@ impl Timeline {
|
||||
|
||||
// If we have a page image, and no WAL, we're all set
|
||||
if data.records.is_empty() {
|
||||
if let Some((img_lsn, img)) = &data.img {
|
||||
if let Some((img_lsn, img)) = data.img {
|
||||
trace!(
|
||||
"found page image for key {} at {}, no WAL redo required, req LSN {}",
|
||||
key,
|
||||
img_lsn,
|
||||
request_lsn,
|
||||
);
|
||||
Ok(img.clone())
|
||||
Ok(img)
|
||||
} else {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"base image for {key} at {request_lsn} not found"
|
||||
@@ -5096,33 +5096,130 @@ impl Timeline {
|
||||
//
|
||||
// If we don't have a base image, then the oldest WAL record better initialize
|
||||
// the page
|
||||
if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"Base image for {} at {} not found, but got {} WAL records",
|
||||
key,
|
||||
request_lsn,
|
||||
data.records.len()
|
||||
)))
|
||||
} else {
|
||||
if data.img.is_some() {
|
||||
|
||||
let have_img = data.img.is_some();
|
||||
let will_init = data
|
||||
.records
|
||||
.first()
|
||||
.map(|(_, rec)| rec.will_init())
|
||||
.expect("already checked to have records");
|
||||
|
||||
match (have_img, will_init) {
|
||||
(false, false) => {
|
||||
return Err(PageReconstructError::from(anyhow!(
|
||||
"Base image for {} at {} not found, but got {} WAL records",
|
||||
key,
|
||||
request_lsn,
|
||||
data.records.len()
|
||||
)))
|
||||
}
|
||||
(true, _) => {
|
||||
trace!(
|
||||
"found {} WAL records and a base image for {} at {}, performing WAL redo",
|
||||
data.records.len(),
|
||||
key,
|
||||
request_lsn
|
||||
);
|
||||
} else {
|
||||
}
|
||||
(false, _) => {
|
||||
assert!(will_init, "already checked above");
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
let res = self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
}
|
||||
}
|
||||
|
||||
let oldest_lsn = data
|
||||
.records
|
||||
.first()
|
||||
.map(|(lsn, _)| lsn)
|
||||
.expect("again, checked");
|
||||
|
||||
// walk up the ancestry until we have found an ancestor covering the lsn range
|
||||
let ancestry = std::iter::successors(Some(self), |tl| tl.ancestor_timeline.as_deref())
|
||||
// 100 - initdb R pg14
|
||||
// 150 - branch S pg14
|
||||
// 200 - branch T pg15
|
||||
// 250 - branch U pg15
|
||||
// 300 - branch V pg16
|
||||
//
|
||||
// oldest_lsn = 155:
|
||||
// get [V pg16, U pg15(one_more=true), T pg15(one_more=true), S pg14(one_more=false)]
|
||||
.take_while({
|
||||
let mut one_more = true;
|
||||
|
||||
move |tl| {
|
||||
if *oldest_lsn < tl.ancestor_lsn {
|
||||
assert!(one_more);
|
||||
true
|
||||
} else {
|
||||
let prev = one_more;
|
||||
one_more = false;
|
||||
prev
|
||||
}
|
||||
}
|
||||
})
|
||||
// remove consecutive same pg_versions, which might be all in case we can use the
|
||||
// same timeline for all reconstruction.
|
||||
// [V pg16, U pg15, T pg15, S pg14] => [V pg16, T pg15, S pg14]
|
||||
.fold(Vec::<&Timeline>::with_capacity(4), |mut acc, next| {
|
||||
if acc
|
||||
.last()
|
||||
.map(|tl| tl.pg_version == next.pg_version)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// overwrite with an earlier timeline; additionally we only allow upgrades,
|
||||
// so we cannot go backwards like pg14 (branch) pg15 (branch) pg14
|
||||
*acc.last_mut().unwrap() = next;
|
||||
} else {
|
||||
acc.push(next);
|
||||
}
|
||||
acc
|
||||
});
|
||||
|
||||
// shifted for the purpose of timeline_pairs
|
||||
let later_timelines = ancestry
|
||||
.iter()
|
||||
.rev()
|
||||
.skip(1)
|
||||
.map(Some)
|
||||
.chain(std::iter::once(None));
|
||||
|
||||
// zip older and later timelines into pair, which we then use to select parts of
|
||||
// wal records to be executed on which version walredo
|
||||
let timeline_pairs = ancestry.iter().rev().zip(later_timelines);
|
||||
|
||||
let mgr = self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?;
|
||||
|
||||
let mut img = data.img.clone();
|
||||
let mut records = &data.records[..];
|
||||
|
||||
for (older, later) in timeline_pairs {
|
||||
let scratch = records
|
||||
.iter()
|
||||
.take_while(|(lsn, _)| {
|
||||
// if there is no later, take all remaining
|
||||
later.map(|later| lsn < &later.ancestor_lsn).unwrap_or(true)
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
records = &records[scratch.len()..];
|
||||
if later.is_none() {
|
||||
assert!(records.is_empty());
|
||||
}
|
||||
|
||||
// this is only used for logging on the next round
|
||||
let last_lsn = scratch.last().unwrap().0;
|
||||
|
||||
// is request_lsn ok? it's not used for anything important, just logging.
|
||||
let res = mgr
|
||||
.request_redo(key, request_lsn, img, scratch, older.pg_version)
|
||||
.await;
|
||||
let img = match res {
|
||||
Ok(img) => img,
|
||||
|
||||
img = match res {
|
||||
Ok(img) => Some((last_lsn, img)),
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(e)) => {
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
@@ -5130,8 +5227,9 @@ impl Timeline {
|
||||
))
|
||||
}
|
||||
};
|
||||
Ok(img)
|
||||
}
|
||||
|
||||
Ok(img.unwrap().1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -80,6 +80,8 @@ pub struct PostgresRedoManager {
|
||||
/// See [`Self::launched_processes`].
|
||||
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
|
||||
|
||||
processes: [heavier_once_cell::OnceCell<ProcessOnceCell>; 4],
|
||||
|
||||
/// Gate that is entered when launching a walredo process and held open
|
||||
/// until the process has been `kill()`ed and `wait()`ed upon.
|
||||
///
|
||||
@@ -215,10 +217,18 @@ impl PostgresRedoManager {
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
process: self.redo_process.get().and_then(|p| match &*p {
|
||||
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
ProcessOnceCell::ManagerShutDown => None,
|
||||
}),
|
||||
process: self
|
||||
.processes
|
||||
.iter()
|
||||
.filter_map(|p| {
|
||||
p.get().and_then(|p| match &*p {
|
||||
ProcessOnceCell::Spawned(p) => {
|
||||
Some(WalRedoManagerProcessStatus { pid: p.id() })
|
||||
}
|
||||
ProcessOnceCell::ManagerShutDown => None,
|
||||
})
|
||||
})
|
||||
.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -237,6 +247,7 @@ impl PostgresRedoManager {
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
processes: Default::default(),
|
||||
launched_processes: utils::sync::gate::Gate::default(),
|
||||
}
|
||||
}
|
||||
@@ -256,26 +267,31 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn shutdown(&self) -> bool {
|
||||
// prevent new processes from being spawned
|
||||
let maybe_permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
|
||||
None
|
||||
} else {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
Some(permit)
|
||||
let mut it_was_us = false;
|
||||
for process in self.processes.iter() {
|
||||
// prevent new processes from being spawned
|
||||
let maybe_permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
|
||||
None
|
||||
} else {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
Some(permit)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(permit) => Some(permit),
|
||||
};
|
||||
let it_was_us = if let Some(permit) = maybe_permit {
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
Err(permit) => Some(permit),
|
||||
};
|
||||
let i_cant_see_why_this = if let Some(permit) = maybe_permit {
|
||||
process.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// TODO: or is correct?
|
||||
it_was_us |= i_cant_see_why_this;
|
||||
}
|
||||
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
|
||||
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
|
||||
// for the underlying process.
|
||||
@@ -291,7 +307,10 @@ impl PostgresRedoManager {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
|
||||
|
||||
self.processes.iter().for_each(|c| {
|
||||
drop(c.get().map(|guard| guard.take_and_deinit()));
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -314,13 +333,23 @@ impl PostgresRedoManager {
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, Error> {
|
||||
assert!(
|
||||
(14..=17).contains(&pg_version),
|
||||
"this should be an enum, but no: {pg_version}"
|
||||
);
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
|
||||
// handling multiple processes idea: just support N versions here, but the caller
|
||||
// splits per parent_lsn in the case that:
|
||||
// - reconstruct_data spans two versions
|
||||
// - reconstruct_data went to parent???
|
||||
let process = &self.processes[(pg_version - 14) as usize];
|
||||
|
||||
let proc: Arc<Process> = match process.get_or_init_detached().await {
|
||||
Ok(guard) => match &*guard {
|
||||
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
|
||||
ProcessOnceCell::ManagerShutDown => {
|
||||
@@ -332,11 +361,11 @@ impl PostgresRedoManager {
|
||||
// acquire guard before spawning process, so that we don't spawn new processes
|
||||
// if the gate is already closed.
|
||||
let _launched_processes_guard = match self.launched_processes.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => unreachable!(
|
||||
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
|
||||
),
|
||||
};
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => unreachable!(
|
||||
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
|
||||
),
|
||||
};
|
||||
let proc = Arc::new(Process {
|
||||
process: process::WalRedoProcess::launch(
|
||||
self.conf,
|
||||
@@ -353,8 +382,7 @@ impl PostgresRedoManager {
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
|
||||
process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
|
||||
proc
|
||||
}
|
||||
};
|
||||
@@ -419,7 +447,7 @@ impl PostgresRedoManager {
|
||||
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
|
||||
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
|
||||
// This probably needs revisiting at some later point.
|
||||
match self.redo_process.get() {
|
||||
match process.get() {
|
||||
None => (),
|
||||
Some(guard) => {
|
||||
match &*guard {
|
||||
@@ -448,9 +476,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Process a batch of WAL records using bespoken Neon code.
|
||||
///
|
||||
/// Process a batch of WAL records using bespoke Neon code.
|
||||
fn apply_batch_neon(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -471,7 +497,7 @@ impl PostgresRedoManager {
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
for (record_lsn, record) in records.iter() {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
apply_neon::apply_in_neon(record, *record_lsn, key, &mut page)?;
|
||||
}
|
||||
// Success!
|
||||
let duration = start_time.elapsed();
|
||||
@@ -488,18 +514,6 @@ impl PostgresRedoManager {
|
||||
|
||||
Ok(page.freeze())
|
||||
}
|
||||
|
||||
fn apply_record_neon(
|
||||
&self,
|
||||
key: Key,
|
||||
page: &mut BytesMut,
|
||||
record_lsn: Lsn,
|
||||
record: &NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
apply_neon::apply_in_neon(record, record_lsn, key, page)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user