mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
feat: allow detaching from ancestor for timelines without writes (#7639)
The first implementation #7456 did not include `index_part.json` changes in an attempt to keep amount of changes down. Tracks the historic reparentings and earlier detach in `index_part.json`. - `index_part.json` receives a new field `lineage: Lineage` - `Lineage` is queried through RemoteTimelineClient during basebackup, creating `PREV LSN: none` for the invalid prev record lsn just as it would had been created for a newly created timeline - as `struct IndexPart` grew, it is now boxed in places Cc: #6994
This commit is contained in:
@@ -601,7 +601,7 @@ where
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
if self.lsn == self.timeline.get_ancestor_lsn() {
|
||||
if self.timeline.is_ancestor_lsn(self.lsn) {
|
||||
write!(zenith_signal, "PREV LSN: none")
|
||||
.map_err(|e| BasebackupError::Server(e.into()))?;
|
||||
} else {
|
||||
|
||||
@@ -214,12 +214,12 @@ impl TimelineMetadata {
|
||||
self.body.ancestor_timeline = Some(*timeline);
|
||||
}
|
||||
|
||||
pub fn detach_from_ancestor(&mut self, timeline: &TimelineId, ancestor_lsn: &Lsn) {
|
||||
pub fn detach_from_ancestor(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
if let Some(ancestor) = self.body.ancestor_timeline {
|
||||
assert_eq!(ancestor, *timeline);
|
||||
assert_eq!(ancestor, branchpoint.0);
|
||||
}
|
||||
if self.body.ancestor_lsn != Lsn(0) {
|
||||
assert_eq!(self.body.ancestor_lsn, *ancestor_lsn);
|
||||
assert_eq!(self.body.ancestor_lsn, branchpoint.1);
|
||||
}
|
||||
self.body.ancestor_timeline = None;
|
||||
self.body.ancestor_lsn = Lsn(0);
|
||||
|
||||
@@ -437,6 +437,19 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if this timeline was previously detached at this Lsn and the remote timeline
|
||||
/// client is currently initialized.
|
||||
pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
|
||||
// technically this is a dirty read, but given how timeline detach ancestor is implemented
|
||||
// via tenant restart, the lineage has always been uploaded.
|
||||
self.upload_queue
|
||||
.lock()
|
||||
.unwrap()
|
||||
.initialized_mut()
|
||||
.map(|uq| uq.latest_lineage.is_previous_ancestor_lsn(lsn))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
|
||||
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
|
||||
current_remote_index_part
|
||||
@@ -628,7 +641,7 @@ impl RemoteTimelineClient {
|
||||
);
|
||||
|
||||
let index_part = IndexPart::from(&*upload_queue);
|
||||
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
|
||||
let op = UploadOp::UploadMetadata(Box::new(index_part), disk_consistent_lsn);
|
||||
self.metric_begin(&op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled = 0;
|
||||
@@ -647,7 +660,14 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
let Some(prev) = upload_queue.latest_metadata.ancestor_timeline() else {
|
||||
return Err(anyhow::anyhow!(
|
||||
"cannot reparent without a current ancestor"
|
||||
));
|
||||
};
|
||||
|
||||
upload_queue.latest_metadata.reparent(new_parent);
|
||||
upload_queue.latest_lineage.record_previous_ancestor(&prev);
|
||||
|
||||
self.schedule_index_upload(upload_queue);
|
||||
|
||||
@@ -670,9 +690,8 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
upload_queue
|
||||
.latest_metadata
|
||||
.detach_from_ancestor(&adopted.0, &adopted.1);
|
||||
upload_queue.latest_metadata.detach_from_ancestor(&adopted);
|
||||
upload_queue.latest_lineage.record_detaching(&adopted);
|
||||
|
||||
for layer in layers {
|
||||
upload_queue
|
||||
@@ -1811,6 +1830,7 @@ impl RemoteTimelineClient {
|
||||
latest_files: initialized.latest_files.clone(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: initialized.latest_metadata.clone(),
|
||||
latest_lineage: initialized.latest_lineage.clone(),
|
||||
projected_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn: initialized
|
||||
.visible_remote_consistent_lsn
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::collections::HashMap;
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
@@ -84,6 +85,9 @@ pub struct IndexPart {
|
||||
|
||||
#[serde(rename = "metadata_bytes")]
|
||||
pub metadata: TimelineMetadata,
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) lineage: Lineage,
|
||||
}
|
||||
|
||||
impl IndexPart {
|
||||
@@ -96,10 +100,11 @@ impl IndexPart {
|
||||
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
|
||||
/// is always generated from the keys of `layer_metadata`)
|
||||
/// - 4: timeline_layers is fully removed.
|
||||
const LATEST_VERSION: usize = 4;
|
||||
/// - 5: lineage was added
|
||||
const LATEST_VERSION: usize = 5;
|
||||
|
||||
// Versions we may see when reading from a bucket.
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4];
|
||||
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5];
|
||||
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
@@ -107,6 +112,7 @@ impl IndexPart {
|
||||
layers_and_metadata: &HashMap<LayerName, LayerFileMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
metadata: TimelineMetadata,
|
||||
lineage: Lineage,
|
||||
) -> Self {
|
||||
let layer_metadata = layers_and_metadata
|
||||
.iter()
|
||||
@@ -119,6 +125,7 @@ impl IndexPart {
|
||||
disk_consistent_lsn,
|
||||
metadata,
|
||||
deleted_at: None,
|
||||
lineage,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +154,7 @@ impl IndexPart {
|
||||
&HashMap::new(),
|
||||
example_metadata.disk_consistent_lsn(),
|
||||
example_metadata,
|
||||
Default::default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -155,8 +163,9 @@ impl From<&UploadQueueInitialized> for IndexPart {
|
||||
fn from(uq: &UploadQueueInitialized) -> Self {
|
||||
let disk_consistent_lsn = uq.latest_metadata.disk_consistent_lsn();
|
||||
let metadata = uq.latest_metadata.clone();
|
||||
let lineage = uq.latest_lineage.clone();
|
||||
|
||||
Self::new(&uq.latest_files, disk_consistent_lsn, metadata)
|
||||
Self::new(&uq.latest_files, disk_consistent_lsn, metadata, lineage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,8 +193,76 @@ impl From<&LayerFileMetadata> for IndexLayerMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// Limited history of earlier ancestors.
|
||||
///
|
||||
/// A timeline can have more than 1 earlier ancestor, in the rare case that it was repeatedly
|
||||
/// reparented by having an later timeline be detached from it's ancestor.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
|
||||
pub(crate) struct Lineage {
|
||||
/// Has the `reparenting_history` been truncated to [`Lineage::REMEMBER_AT_MOST`].
|
||||
#[serde(skip_serializing_if = "is_false", default)]
|
||||
reparenting_history_truncated: bool,
|
||||
|
||||
/// Earlier ancestors, truncated when [`Self::reparenting_history_truncated`]
|
||||
///
|
||||
/// These are stored in case we want to support WAL based DR on the timeline. There can be many
|
||||
/// of these and at most one [`Self::original_ancestor`]. There cannot be more reparentings
|
||||
/// after [`Self::original_ancestor`] has been set.
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
reparenting_history: Vec<TimelineId>,
|
||||
|
||||
/// The ancestor from which this timeline has been detached from and when.
|
||||
///
|
||||
/// If you are adding support for detaching from a hierarchy, consider changing the ancestry
|
||||
/// into a `Vec<(TimelineId, Lsn)>` to be a path instead.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
original_ancestor: Option<(TimelineId, Lsn, NaiveDateTime)>,
|
||||
}
|
||||
|
||||
fn is_false(b: &bool) -> bool {
|
||||
!b
|
||||
}
|
||||
|
||||
impl Lineage {
|
||||
const REMEMBER_AT_MOST: usize = 100;
|
||||
|
||||
pub(crate) fn record_previous_ancestor(&mut self, old_ancestor: &TimelineId) {
|
||||
if self.reparenting_history.last() == Some(old_ancestor) {
|
||||
// do not re-record it
|
||||
return;
|
||||
}
|
||||
|
||||
let drop_oldest = self.reparenting_history.len() + 1 >= Self::REMEMBER_AT_MOST;
|
||||
|
||||
self.reparenting_history_truncated |= drop_oldest;
|
||||
if drop_oldest {
|
||||
self.reparenting_history.remove(0);
|
||||
}
|
||||
self.reparenting_history.push(*old_ancestor);
|
||||
}
|
||||
|
||||
pub(crate) fn record_detaching(&mut self, branchpoint: &(TimelineId, Lsn)) {
|
||||
assert!(self.original_ancestor.is_none());
|
||||
|
||||
self.original_ancestor =
|
||||
Some((branchpoint.0, branchpoint.1, chrono::Utc::now().naive_utc()));
|
||||
}
|
||||
|
||||
/// The queried lsn is most likely the basebackup lsn, and this answers question "is it allowed
|
||||
/// to start a read/write primary at this lsn".
|
||||
///
|
||||
/// Returns true if the Lsn was previously a branch point.
|
||||
pub(crate) fn is_previous_ancestor_lsn(&self, lsn: Lsn) -> bool {
|
||||
self.original_ancestor
|
||||
.as_ref()
|
||||
.is_some_and(|(_, ancestor_lsn, _)| lsn == *ancestor_lsn)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -221,6 +298,7 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
@@ -261,6 +339,7 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
@@ -302,7 +381,8 @@ mod tests {
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
lineage: Lineage::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
@@ -347,6 +427,7 @@ mod tests {
|
||||
])
|
||||
.unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage::default(),
|
||||
};
|
||||
|
||||
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
@@ -385,11 +466,58 @@ mod tests {
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: Some(chrono::NaiveDateTime::parse_from_str(
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap()),
|
||||
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
|
||||
lineage: Lineage::default(),
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn v5_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"version":5,
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499":{"file_size":23289856,"generation":1},
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619":{"file_size":1015808,"generation":1}},
|
||||
"disk_consistent_lsn":"0/15A7618",
|
||||
"metadata_bytes":[226,88,25,241,0,46,0,4,0,0,0,0,1,90,118,24,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,78,244,32,0,0,0,0,1,78,244,32,0,0,0,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
|
||||
"lineage":{
|
||||
"original_ancestor":["e2bfd8c633d713d279e6fcd2bcc15b6d","0/15A7618","2024-05-07T18:52:36.322426563"],
|
||||
"reparenting_history":["e1bfd8c633d713d279e6fcd2bcc15b6d"]
|
||||
}
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 5,
|
||||
layer_metadata: HashMap::from([
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF420-00000000014EF499".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 23289856,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
}),
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000014EF499-00000000015A7619".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: 1015808,
|
||||
generation: Generation::new(1),
|
||||
shard: ShardIndex::unsharded(),
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
|
||||
metadata: TimelineMetadata::from_bytes(&[226,88,25,241,0,46,0,4,0,0,0,0,1,90,118,24,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,78,244,32,0,0,0,0,1,78,244,32,0,0,0,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
|
||||
deleted_at: None,
|
||||
lineage: Lineage {
|
||||
reparenting_history_truncated: false,
|
||||
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
|
||||
original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))),
|
||||
},
|
||||
};
|
||||
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
fn parse_naive_datetime(s: &str) -> NaiveDateTime {
|
||||
chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f").unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3037,6 +3037,18 @@ impl Timeline {
|
||||
|
||||
Some(HeatMapTimeline::new(self.timeline_id, layers))
|
||||
}
|
||||
|
||||
/// Returns true if the given lsn is or was an ancestor branchpoint.
|
||||
pub(crate) fn is_ancestor_lsn(&self, lsn: Lsn) -> bool {
|
||||
// upon timeline detach, we set the ancestor_lsn to Lsn::INVALID and the store the original
|
||||
// branchpoint in the value in IndexPart::lineage
|
||||
self.ancestor_lsn == lsn
|
||||
|| (self.ancestor_lsn == Lsn::INVALID
|
||||
&& self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.is_some_and(|rtc| rtc.is_previous_ancestor_lsn(lsn)))
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalId = Arc<str>;
|
||||
@@ -4354,7 +4366,6 @@ impl Timeline {
|
||||
/// - has an ancestor to detach from
|
||||
/// - the ancestor does not have an ancestor -- follows from the original RFC limitations, not
|
||||
/// a technical requirement
|
||||
/// - has prev_lsn in remote storage (temporary restriction)
|
||||
///
|
||||
/// After the operation has been started, it cannot be canceled. Upon restart it needs to be
|
||||
/// polled again until completion.
|
||||
|
||||
@@ -22,8 +22,6 @@ pub(crate) enum Error {
|
||||
TooManyAncestors,
|
||||
#[error("shutting down, please retry later")]
|
||||
ShuttingDown,
|
||||
#[error("detached timeline must receive writes before the operation")]
|
||||
DetachedTimelineNeedsWrites,
|
||||
#[error("flushing failed")]
|
||||
FlushAncestor(#[source] anyhow::Error),
|
||||
#[error("layer download failed")]
|
||||
@@ -94,14 +92,6 @@ pub(super) async fn prepare(
|
||||
return Err(TooManyAncestors);
|
||||
}
|
||||
|
||||
if detached.get_prev_record_lsn() == Lsn::INVALID
|
||||
|| detached.disk_consistent_lsn.load() == ancestor_lsn
|
||||
{
|
||||
// this is to avoid a problem that after detaching we would be unable to start up the
|
||||
// compute because of "PREV_LSN: invalid".
|
||||
return Err(DetachedTimelineNeedsWrites);
|
||||
}
|
||||
|
||||
// before we acquire the gate, we must mark the ancestor as having a detach operation
|
||||
// ongoing which will block other concurrent detach operations so we don't get to ackward
|
||||
// situations where there would be two branches trying to reparent earlier branches.
|
||||
|
||||
@@ -3,6 +3,7 @@ use super::storage_layer::ResidentLayer;
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::remote_timeline_client::index::Lineage;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
|
||||
@@ -56,6 +57,9 @@ pub(crate) struct UploadQueueInitialized {
|
||||
/// DANGER: do not return to outside world, e.g., safekeepers.
|
||||
pub(crate) latest_metadata: TimelineMetadata,
|
||||
|
||||
/// Part of the flattened "next" `index_part.json`.
|
||||
pub(crate) latest_lineage: Lineage,
|
||||
|
||||
/// `disk_consistent_lsn` from the last metadata file that was successfully
|
||||
/// uploaded. `Lsn(0)` if nothing was uploaded yet.
|
||||
/// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
|
||||
@@ -171,6 +175,7 @@ impl UploadQueue {
|
||||
latest_files: HashMap::new(),
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: metadata.clone(),
|
||||
latest_lineage: Lineage::default(),
|
||||
projected_remote_consistent_lsn: None,
|
||||
visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
|
||||
// what follows are boring default initializations
|
||||
@@ -218,6 +223,7 @@ impl UploadQueue {
|
||||
latest_files: files,
|
||||
latest_files_changes_since_metadata_upload_scheduled: 0,
|
||||
latest_metadata: index_part.metadata.clone(),
|
||||
latest_lineage: index_part.lineage.clone(),
|
||||
projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
|
||||
visible_remote_consistent_lsn: Arc::new(
|
||||
index_part.metadata.disk_consistent_lsn().into(),
|
||||
@@ -290,7 +296,7 @@ pub(crate) enum UploadOp {
|
||||
UploadLayer(ResidentLayer, LayerFileMetadata),
|
||||
|
||||
/// Upload the metadata file
|
||||
UploadMetadata(IndexPart, Lsn),
|
||||
UploadMetadata(Box<IndexPart>, Lsn),
|
||||
|
||||
/// Delete layer files
|
||||
Delete(Delete),
|
||||
|
||||
@@ -246,7 +246,7 @@ pub(crate) struct S3TimelineBlobData {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum BlobDataParseResult {
|
||||
Parsed {
|
||||
index_part: IndexPart,
|
||||
index_part: Box<IndexPart>,
|
||||
index_part_generation: Generation,
|
||||
s3_layers: HashSet<(LayerName, Generation)>,
|
||||
},
|
||||
@@ -368,7 +368,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
Ok(index_part) => {
|
||||
return Ok(S3TimelineBlobData {
|
||||
blob_data: BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part: Box::new(index_part),
|
||||
index_part_generation,
|
||||
s3_layers,
|
||||
},
|
||||
|
||||
@@ -159,7 +159,7 @@ impl SnapshotDownloader {
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
ttid: TenantShardTimelineId,
|
||||
index_part: IndexPart,
|
||||
index_part: Box<IndexPart>,
|
||||
index_part_generation: Generation,
|
||||
ancestor_layers: &mut HashMap<
|
||||
TenantShardTimelineId,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import enum
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from queue import Empty, Queue
|
||||
@@ -12,6 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import HistoricLayerInfo
|
||||
from fixtures.pageserver.utils import wait_timeline_detail_404
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
|
||||
|
||||
@@ -56,15 +58,16 @@ SHUTDOWN_ALLOWED_ERRORS = [
|
||||
|
||||
@pytest.mark.parametrize("branchpoint", Branchpoint.all())
|
||||
@pytest.mark.parametrize("restart_after", [True, False])
|
||||
@pytest.mark.parametrize("write_to_branch_first", [True, False])
|
||||
def test_ancestor_detach_branched_from(
|
||||
neon_env_builder: NeonEnvBuilder, branchpoint: Branchpoint, restart_after: bool
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
branchpoint: Branchpoint,
|
||||
restart_after: bool,
|
||||
write_to_branch_first: bool,
|
||||
):
|
||||
"""
|
||||
Creates a branch relative to L0 lsn boundary according to Branchpoint. Later the timeline is detached.
|
||||
"""
|
||||
# TODO: parametrize; currently unimplemented over at pageserver
|
||||
write_to_branch_first = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
@@ -174,8 +177,7 @@ def test_ancestor_detach_branched_from(
|
||||
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restart_after", [True, False])
|
||||
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, restart_after: bool):
|
||||
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
The case from RFC:
|
||||
|
||||
@@ -204,9 +206,6 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, res
|
||||
We confirm the end result by being able to delete "old main" after deleting "after".
|
||||
"""
|
||||
|
||||
# TODO: support not yet implemented for these
|
||||
write_to_branch_first = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(SHUTDOWN_ALLOWED_ERRORS)
|
||||
@@ -244,42 +243,57 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, res
|
||||
|
||||
after = env.neon_cli.create_branch("after", "main", env.initial_tenant, ancestor_start_lsn=None)
|
||||
|
||||
if write_to_branch_first:
|
||||
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
|
||||
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 8192
|
||||
with ep.cursor() as cur:
|
||||
cur.execute("UPDATE audit SET starts = starts + 1")
|
||||
assert cur.rowcount == 1
|
||||
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(env.initial_tenant, timeline_id)
|
||||
|
||||
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert all_reparented == {reparented, same_branchpoint}
|
||||
|
||||
if restart_after:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
# checking the ancestor after is much faster than waiting for the endpoint not start
|
||||
expected_result = [
|
||||
("main", env.initial_timeline, None, 16384, 1),
|
||||
("after", after, env.initial_timeline, 16384, 1),
|
||||
("new main", timeline_id, None, 8192, 2),
|
||||
("new main", timeline_id, None, 8192, 1),
|
||||
("same_branchpoint", same_branchpoint, timeline_id, 8192, 1),
|
||||
("reparented", reparented, timeline_id, 0, 1),
|
||||
]
|
||||
|
||||
for _, timeline_id, expected_ancestor, _, _ in expected_result:
|
||||
details = client.timeline_detail(env.initial_tenant, timeline_id)
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
for _, queried_timeline, expected_ancestor, _, _ in expected_result:
|
||||
details = client.timeline_detail(env.initial_tenant, queried_timeline)
|
||||
ancestor_timeline_id = details["ancestor_timeline_id"]
|
||||
if expected_ancestor is None:
|
||||
assert ancestor_timeline_id is None
|
||||
else:
|
||||
assert TimelineId(ancestor_timeline_id) == expected_ancestor
|
||||
|
||||
index_part = env.pageserver_remote_storage.index_content(
|
||||
env.initial_tenant, queried_timeline
|
||||
)
|
||||
lineage = index_part["lineage"]
|
||||
assert lineage is not None
|
||||
|
||||
assert lineage.get("reparenting_history_overflown", "false") == "false"
|
||||
|
||||
if queried_timeline == timeline_id:
|
||||
original_ancestor = lineage["original_ancestor"]
|
||||
assert original_ancestor is not None
|
||||
assert original_ancestor[0] == str(env.initial_timeline)
|
||||
assert original_ancestor[1] == str(branchpoint_x)
|
||||
|
||||
# this does not contain Z in the end, so fromisoformat accepts it
|
||||
# it is to be in line with the deletion timestamp.. well, almost.
|
||||
when = original_ancestor[2][:26]
|
||||
when_ts = datetime.datetime.fromisoformat(when)
|
||||
assert when_ts < datetime.datetime.now()
|
||||
assert len(lineage.get("reparenting_history", [])) == 0
|
||||
elif expected_ancestor == timeline_id:
|
||||
assert len(lineage.get("original_ancestor", [])) == 0
|
||||
assert lineage["reparenting_history"] == [str(env.initial_timeline)]
|
||||
else:
|
||||
assert len(lineage.get("original_ancestor", [])) == 0
|
||||
assert len(lineage.get("reparenting_history", [])) == 0
|
||||
|
||||
for name, _, _, rows, starts in expected_result:
|
||||
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
|
||||
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
|
||||
@@ -293,14 +307,10 @@ def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, res
|
||||
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restart_after", [True, False])
|
||||
def test_detached_receives_flushes_while_being_detached(
|
||||
neon_env_builder: NeonEnvBuilder, restart_after: bool
|
||||
):
|
||||
def test_detached_receives_flushes_while_being_detached(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Makes sure that the timeline is able to receive writes through-out the detach process.
|
||||
"""
|
||||
write_to_branch_first = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -330,12 +340,6 @@ def test_detached_receives_flushes_while_being_detached(
|
||||
ep = env.endpoints.create_start("new main", tenant_id=env.initial_tenant)
|
||||
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
|
||||
|
||||
if write_to_branch_first:
|
||||
rows += insert_rows(256, ep)
|
||||
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
|
||||
client.timeline_checkpoint(env.initial_tenant, timeline_id)
|
||||
log.info("completed {write_to_branch_first=}")
|
||||
|
||||
def small_txs(ep, queue: Queue[str], barrier):
|
||||
extra_rows = 0
|
||||
|
||||
@@ -368,11 +372,6 @@ def test_detached_receives_flushes_while_being_detached(
|
||||
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
|
||||
assert len(reparented) == 0
|
||||
|
||||
if restart_after:
|
||||
# ep and row production is kept alive on purpose
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
queue.put("done")
|
||||
|
||||
Reference in New Issue
Block a user