mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Correct mistakes in offloaded timeline retain_lsn management (#9760)
PR #9308 has modified tenant activation code to take offloaded child timelines into account for populating the list of `retain_lsn` values. However, there is more places than just tenant activation where one needs to update the `retain_lsn`s. This PR fixes some bugs of the current code that could lead to corruption in the worst case: 1. Deleting of an offloaded timeline would not get its `retain_lsn` purged from its parent. With the patch we now do it, but as the parent can be offloaded as well, the situatoin is a bit trickier than for non-offloaded timelines which can just keep a pointer to their parent. Here we can't keep a pointer because the parent might get offloaded, then unoffloaded again, creating a dangling pointer situation. Keeping a pointer to the *tenant* is not good either, because we might drop the offloaded timeline in a context where a `offloaded_timelines` lock is already held: so we don't want to acquire a lock in the drop code of OffloadedTimeline. 2. Unoffloading a timeline would not get its `retain_lsn` values populated, leading to it maybe garbage collecting values that its children might need. We now call `initialize_gc_info` on the parent. 3. Offloading of a timeline would not get its `retain_lsn` values registered as offloaded at the parent. So if we drop the `Timeline` object, and its registration is removed, the parent would not have any of the child's `retain_lsn`s around. Also, before, the `Timeline` object would delete anything related to its timeline ID, now it only deletes `retain_lsn`s that have `MaybeOffloaded::No` set. Incorporates Chi's reproducer from #9753. cc https://github.com/neondatabase/cloud/issues/20199 The `test_timeline_retain_lsn` test is extended: 1. it gains a new dimension, duplicating each mode, to either have the "main" branch be the direct parent of the timeline we archive, or the "test_archived_parent" branch intermediary, creating a three timeline structure. This doesn't test anything fixed by this PR in particular, just explores the vast space of possible configurations a little bit more. 2. it gains two new modes, `offload-parent`, which tests the second point, and `offload-no-restart` which tests the third point. It's easy to verify the test actually is "sharp" by removing one of the respective `self.initialize_gc_info()`, `gc_info.insert_child()` or `ancestor_children.push()`. Part of #8088 --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -39,6 +39,7 @@ use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
@@ -524,6 +525,9 @@ pub struct OffloadedTimeline {
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: TimelineDeleteProgress,
|
||||
|
||||
/// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it
|
||||
pub deleted_from_ancestor: AtomicBool,
|
||||
}
|
||||
|
||||
impl OffloadedTimeline {
|
||||
@@ -533,9 +537,16 @@ impl OffloadedTimeline {
|
||||
/// the timeline is not in a stopped state.
|
||||
/// Panics if the timeline is not archived.
|
||||
fn from_timeline(timeline: &Timeline) -> Result<Self, UploadQueueNotReadyError> {
|
||||
let ancestor_retain_lsn = timeline
|
||||
.get_ancestor_timeline_id()
|
||||
.map(|_timeline_id| timeline.get_ancestor_lsn());
|
||||
let (ancestor_retain_lsn, ancestor_timeline_id) =
|
||||
if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
|
||||
let ancestor_lsn = timeline.get_ancestor_lsn();
|
||||
let ancestor_timeline_id = ancestor_timeline.timeline_id;
|
||||
let mut gc_info = ancestor_timeline.gc_info.write().unwrap();
|
||||
gc_info.insert_child(timeline.timeline_id, ancestor_lsn, MaybeOffloaded::Yes);
|
||||
(Some(ancestor_lsn), Some(ancestor_timeline_id))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
let archived_at = timeline
|
||||
.remote_client
|
||||
.archived_at_stopped_queue()?
|
||||
@@ -543,14 +554,17 @@ impl OffloadedTimeline {
|
||||
Ok(Self {
|
||||
tenant_shard_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_timeline_id,
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
deleted_from_ancestor: AtomicBool::new(false),
|
||||
})
|
||||
}
|
||||
fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self {
|
||||
// We expect to reach this case in tenant loading, where the `retain_lsn` is populated in the parent's `gc_info`
|
||||
// by the `initialize_gc_info` function.
|
||||
let OffloadedTimelineManifest {
|
||||
timeline_id,
|
||||
ancestor_timeline_id,
|
||||
@@ -564,6 +578,7 @@ impl OffloadedTimeline {
|
||||
ancestor_retain_lsn,
|
||||
archived_at,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
deleted_from_ancestor: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
fn manifest(&self) -> OffloadedTimelineManifest {
|
||||
@@ -581,6 +596,33 @@ impl OffloadedTimeline {
|
||||
archived_at: *archived_at,
|
||||
}
|
||||
}
|
||||
/// Delete this timeline's retain_lsn from its ancestor, if present in the given tenant
|
||||
fn delete_from_ancestor_with_timelines(
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) {
|
||||
if let (Some(_retain_lsn), Some(ancestor_timeline_id)) =
|
||||
(self.ancestor_retain_lsn, self.ancestor_timeline_id)
|
||||
{
|
||||
if let Some((_, ancestor_timeline)) = timelines
|
||||
.iter()
|
||||
.find(|(tid, _tl)| **tid == ancestor_timeline_id)
|
||||
{
|
||||
ancestor_timeline
|
||||
.gc_info
|
||||
.write()
|
||||
.unwrap()
|
||||
.remove_child_offloaded(self.timeline_id);
|
||||
}
|
||||
}
|
||||
self.deleted_from_ancestor.store(true, Ordering::Release);
|
||||
}
|
||||
/// Call [`Self::delete_from_ancestor_with_timelines`] instead if possible.
|
||||
///
|
||||
/// As the entire tenant is being dropped, don't bother deregistering the `retain_lsn` from the ancestor.
|
||||
fn defuse_for_tenant_drop(&self) {
|
||||
self.deleted_from_ancestor.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OffloadedTimeline {
|
||||
@@ -589,6 +631,17 @@ impl fmt::Debug for OffloadedTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for OffloadedTimeline {
|
||||
fn drop(&mut self) {
|
||||
if !self.deleted_from_ancestor.load(Ordering::Acquire) {
|
||||
tracing::warn!(
|
||||
"offloaded timeline {} was dropped without having cleaned it up at the ancestor",
|
||||
self.timeline_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum MaybeOffloaded {
|
||||
Yes,
|
||||
@@ -1531,7 +1584,7 @@ impl Tenant {
|
||||
}
|
||||
// Complete deletions for offloaded timeline id's.
|
||||
offloaded_timelines_list
|
||||
.retain(|(offloaded_id, _offloaded)| {
|
||||
.retain(|(offloaded_id, offloaded)| {
|
||||
// At this point, offloaded_timeline_ids has the list of all offloaded timelines
|
||||
// without a prefix in S3, so they are inexistent.
|
||||
// In the end, existence of a timeline is finally determined by the existence of an index-part.json in remote storage.
|
||||
@@ -1539,6 +1592,7 @@ impl Tenant {
|
||||
let delete = offloaded_timeline_ids.contains(offloaded_id);
|
||||
if delete {
|
||||
tracing::info!("Removing offloaded timeline {offloaded_id} from manifest as no remote prefix was found");
|
||||
offloaded.defuse_for_tenant_drop();
|
||||
}
|
||||
!delete
|
||||
});
|
||||
@@ -1927,9 +1981,15 @@ impl Tenant {
|
||||
)));
|
||||
};
|
||||
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
if offloaded_timelines.remove(&timeline_id).is_none() {
|
||||
warn!("timeline already removed from offloaded timelines");
|
||||
match offloaded_timelines.remove(&timeline_id) {
|
||||
Some(offloaded) => {
|
||||
offloaded.delete_from_ancestor_with_timelines(&timelines);
|
||||
}
|
||||
None => warn!("timeline already removed from offloaded timelines"),
|
||||
}
|
||||
|
||||
self.initialize_gc_info(&timelines, &offloaded_timelines, Some(timeline_id));
|
||||
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
@@ -2667,7 +2727,7 @@ impl Tenant {
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor);
|
||||
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
@@ -2782,8 +2842,14 @@ impl Tenant {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode);
|
||||
js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await });
|
||||
})
|
||||
};
|
||||
});
|
||||
}
|
||||
{
|
||||
let timelines_offloaded = self.timelines_offloaded.lock().unwrap();
|
||||
timelines_offloaded.values().for_each(|timeline| {
|
||||
timeline.defuse_for_tenant_drop();
|
||||
});
|
||||
}
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
tracing::info!("Waiting for timelines...");
|
||||
while let Some(res) = js.join_next().await {
|
||||
@@ -3767,10 +3833,13 @@ impl Tenant {
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines_offloaded: &std::sync::MutexGuard<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
|
||||
restrict_to_timeline: Option<TimelineId>,
|
||||
) {
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
assert!(!self.is_active());
|
||||
if restrict_to_timeline.is_none() {
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
assert!(!self.is_active());
|
||||
}
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
@@ -3803,7 +3872,12 @@ impl Tenant {
|
||||
let horizon = self.get_gc_horizon();
|
||||
|
||||
// Populate each timeline's GcInfo with information about its child branches
|
||||
for timeline in timelines.values() {
|
||||
let timelines_to_write = if let Some(timeline_id) = restrict_to_timeline {
|
||||
itertools::Either::Left(timelines.get(&timeline_id).into_iter())
|
||||
} else {
|
||||
itertools::Either::Right(timelines.values())
|
||||
};
|
||||
for timeline in timelines_to_write {
|
||||
let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints
|
||||
.remove(&timeline.timeline_id)
|
||||
.unwrap_or_default();
|
||||
@@ -9650,4 +9724,54 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_timeline_offload_retain_lsn")
|
||||
.await
|
||||
.unwrap();
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline_parent = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let tline_child = tenant
|
||||
.branch_timeline_test(&tline_parent, NEW_TIMELINE_ID, Some(Lsn(0x20)), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
let gc_info_parent = tline_parent.gc_info.read().unwrap();
|
||||
assert_eq!(
|
||||
gc_info_parent.retain_lsns,
|
||||
vec![(Lsn(0x20), tline_child.timeline_id, MaybeOffloaded::No)]
|
||||
);
|
||||
}
|
||||
// We have to directly call the remote_client instead of using the archive function to avoid constructing broker client...
|
||||
tline_child
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_archival_state(TimelineArchivalState::Archived)
|
||||
.unwrap();
|
||||
tline_child.remote_client.wait_completion().await.unwrap();
|
||||
offload_timeline(&tenant, &tline_child)
|
||||
.instrument(tracing::info_span!(parent: None, "offload_test", tenant_id=%"test", shard_id=%"test", timeline_id=%"test"))
|
||||
.await.unwrap();
|
||||
let child_timeline_id = tline_child.timeline_id;
|
||||
Arc::try_unwrap(tline_child).unwrap();
|
||||
|
||||
{
|
||||
let gc_info_parent = tline_parent.gc_info.read().unwrap();
|
||||
assert_eq!(
|
||||
gc_info_parent.retain_lsns,
|
||||
vec![(Lsn(0x20), child_timeline_id, MaybeOffloaded::Yes)]
|
||||
);
|
||||
}
|
||||
|
||||
tenant
|
||||
.get_offloaded_timeline(child_timeline_id)
|
||||
.unwrap()
|
||||
.defuse_for_tenant_drop();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,8 +477,21 @@ impl GcInfo {
|
||||
self.retain_lsns.sort_by_key(|i| i.0);
|
||||
}
|
||||
|
||||
pub(super) fn remove_child(&mut self, child_id: TimelineId) {
|
||||
self.retain_lsns.retain(|i| i.1 != child_id);
|
||||
pub(super) fn remove_child_maybe_offloaded(
|
||||
&mut self,
|
||||
child_id: TimelineId,
|
||||
maybe_offloaded: MaybeOffloaded,
|
||||
) {
|
||||
self.retain_lsns
|
||||
.retain(|i| !(i.1 == child_id && i.2 == maybe_offloaded));
|
||||
}
|
||||
|
||||
pub(super) fn remove_child_not_offloaded(&mut self, child_id: TimelineId) {
|
||||
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::No);
|
||||
}
|
||||
|
||||
pub(super) fn remove_child_offloaded(&mut self, child_id: TimelineId) {
|
||||
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4501,7 +4514,7 @@ impl Drop for Timeline {
|
||||
// This lock should never be poisoned, but in case it is we do a .map() instead of
|
||||
// an unwrap(), to avoid panicking in a destructor and thereby aborting the process.
|
||||
if let Ok(mut gc_info) = ancestor.gc_info.write() {
|
||||
gc_info.remove_child(self.timeline_id)
|
||||
gc_info.remove_child_not_offloaded(self.timeline_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5030,7 +5043,7 @@ impl Timeline {
|
||||
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > space_cutoff {
|
||||
debug!(
|
||||
info!(
|
||||
"keeping {} because it's newer than space_cutoff {}",
|
||||
l.layer_name(),
|
||||
space_cutoff,
|
||||
@@ -5041,7 +5054,7 @@ impl Timeline {
|
||||
|
||||
// 2. It is newer than PiTR cutoff point?
|
||||
if l.get_lsn_range().end > time_cutoff {
|
||||
debug!(
|
||||
info!(
|
||||
"keeping {} because it's newer than time_cutoff {}",
|
||||
l.layer_name(),
|
||||
time_cutoff,
|
||||
@@ -5060,7 +5073,7 @@ impl Timeline {
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
debug!(
|
||||
info!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
retain_lsn,
|
||||
@@ -5075,7 +5088,7 @@ impl Timeline {
|
||||
if let Some(lsn) = &max_lsn_with_valid_lease {
|
||||
// keep if layer start <= any of the lease
|
||||
if &l.get_lsn_range().start <= lsn {
|
||||
debug!(
|
||||
info!(
|
||||
"keeping {} because there is a valid lease preventing GC at {}",
|
||||
l.layer_name(),
|
||||
lsn,
|
||||
@@ -5107,13 +5120,13 @@ impl Timeline {
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
debug!("keeping {} because it is the latest layer", l.layer_name());
|
||||
info!("keeping {} because it is the latest layer", l.layer_name());
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
debug!(
|
||||
info!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.layer_name(),
|
||||
l.is_incremental(),
|
||||
|
||||
@@ -141,9 +141,10 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
);
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(timeline) => {
|
||||
timelines_offloaded
|
||||
let offloaded_timeline = timelines_offloaded
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
|
||||
offloaded_timeline.delete_from_ancestor_with_timelines(&timelines);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ pub(crate) async fn offload_timeline(
|
||||
let conf = &tenant.conf;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await;
|
||||
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
let remaining_refcount = remove_timeline_from_tenant(tenant, &timeline, &guard);
|
||||
|
||||
{
|
||||
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
@@ -87,16 +87,20 @@ pub(crate) async fn offload_timeline(
|
||||
// not our actual state of offloaded timelines.
|
||||
tenant.store_tenant_manifest().await?;
|
||||
|
||||
tracing::info!("Timeline offload complete (remaining arc refcount: {remaining_refcount})");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
///
|
||||
/// Returns the strong count of the timeline `Arc`
|
||||
fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) {
|
||||
) -> usize {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
@@ -109,7 +113,9 @@ fn remove_timeline_from_tenant(
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
|
||||
timelines
|
||||
let timeline = timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
Arc::strong_count(&timeline)
|
||||
}
|
||||
|
||||
@@ -15,13 +15,19 @@ from fixtures.neon_fixtures import (
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
assert_prefix_not_empty,
|
||||
list_prefix,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.utils import run_only_on_default_postgres, wait_until
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
from psycopg2.errors import IoError, UndefinedTable
|
||||
|
||||
|
||||
@pytest.mark.parametrize("shard_count", [0, 4])
|
||||
@@ -641,8 +647,21 @@ def test_timeline_archival_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
assert violations == []
|
||||
|
||||
|
||||
@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None])
|
||||
def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]):
|
||||
@pytest.mark.parametrize("with_intermediary", [False, True])
|
||||
@pytest.mark.parametrize(
|
||||
"offload_child",
|
||||
[
|
||||
"offload",
|
||||
"offload-corrupt",
|
||||
"offload-no-restart",
|
||||
"offload-parent",
|
||||
"archive",
|
||||
None,
|
||||
],
|
||||
)
|
||||
def test_timeline_retain_lsn(
|
||||
neon_env_builder: NeonEnvBuilder, with_intermediary: bool, offload_child: Optional[str]
|
||||
):
|
||||
"""
|
||||
Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones
|
||||
"""
|
||||
@@ -650,6 +669,7 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
# Our corruption code only works with S3 compatible storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
|
||||
neon_env_builder.rust_log_override = "info,[gc_timeline]=debug"
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -657,22 +677,30 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
tenant_id, root_timeline_id = env.create_tenant(
|
||||
conf={
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"checkpoint_distance": 32 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
"compaction_target_size": 32 * 1024,
|
||||
# set small image creation thresholds so that gc deletes data
|
||||
"image_creation_threshold": 2,
|
||||
"image_creation_threshold": 1,
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable pitr, we only want the latest lsn
|
||||
"pitr_interval": "0s",
|
||||
"gc_horizon": 0,
|
||||
# Don't rely on endpoint lsn leases
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
if with_intermediary:
|
||||
parent_branch_name = "test_archived_parent"
|
||||
parent_timeline_id = env.create_branch("test_archived_parent", tenant_id)
|
||||
else:
|
||||
parent_branch_name = "main"
|
||||
parent_timeline_id = root_timeline_id
|
||||
|
||||
with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')",
|
||||
@@ -682,14 +710,16 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
)
|
||||
pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
log.info(f"Pre branch sum: {pre_branch_sum}")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id)
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id)
|
||||
|
||||
# Create a branch and write some additional data to the parent
|
||||
child_timeline_id = env.create_branch("test_archived_branch", tenant_id)
|
||||
child_timeline_id = env.create_branch(
|
||||
"test_archived_branch", tenant_id, ancestor_branch_name=parent_branch_name
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
# Do some churn of the data. This is important so that we can overwrite image layers.
|
||||
for i in range(10):
|
||||
with env.endpoints.create_start(parent_branch_name, tenant_id=tenant_id) as endpoint:
|
||||
# Do some overwriting churn with compactions in between. This is important so that we can overwrite image layers.
|
||||
for i in range(5):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"SELECT setseed(0.23{i})",
|
||||
@@ -698,9 +728,9 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
"UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0",
|
||||
]
|
||||
)
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, parent_timeline_id)
|
||||
post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
log.info(f"Post branch sum: {post_branch_sum}")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id)
|
||||
|
||||
if offload_child is not None:
|
||||
ps_http.timeline_archival_config(
|
||||
@@ -715,9 +745,19 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
assert leaf_detail["is_archived"] is True
|
||||
if "offload" in offload_child:
|
||||
ps_http.timeline_offload(tenant_id, child_timeline_id)
|
||||
if "offload-parent" in offload_child:
|
||||
# Also offload the parent to ensure the retain_lsn of the child
|
||||
# is entered in the parent at unoffloading
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
parent_timeline_id,
|
||||
state=TimelineArchivalState.ARCHIVED,
|
||||
)
|
||||
ps_http.timeline_offload(tenant_id, parent_timeline_id)
|
||||
|
||||
# Do a restart to get rid of any in-memory objects (we only init gc info once, at attach)
|
||||
env.pageserver.stop()
|
||||
if offload_child is None or "no-restart" not in offload_child:
|
||||
env.pageserver.stop()
|
||||
if offload_child == "offload-corrupt":
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
listing = list_prefix(
|
||||
@@ -752,13 +792,21 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
".*page_service_conn_main.*could not find data for key.*",
|
||||
]
|
||||
)
|
||||
env.pageserver.start()
|
||||
if offload_child is None or "no-restart" not in offload_child:
|
||||
env.pageserver.start()
|
||||
if offload_child == "offload-parent":
|
||||
wait_until_tenant_active(ps_http, tenant_id=tenant_id)
|
||||
ps_http.timeline_archival_config(
|
||||
tenant_id,
|
||||
parent_timeline_id,
|
||||
state=TimelineArchivalState.UNARCHIVED,
|
||||
)
|
||||
|
||||
# Do an agressive gc and compaction of the parent branch
|
||||
ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0)
|
||||
ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=parent_timeline_id, gc_horizon=0)
|
||||
ps_http.timeline_checkpoint(
|
||||
tenant_id,
|
||||
root_timeline_id,
|
||||
parent_timeline_id,
|
||||
force_l0_compaction=True,
|
||||
force_repartition=True,
|
||||
wait_until_uploaded=True,
|
||||
@@ -774,10 +822,15 @@ def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Op
|
||||
|
||||
# Now, after unarchival, the child timeline should still have its data accessible (or corrupted)
|
||||
if offload_child == "offload-corrupt":
|
||||
with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"):
|
||||
env.endpoints.create_start(
|
||||
if with_intermediary:
|
||||
error_regex = "(.*could not read .* from page server.*|.*relation .* does not exist)"
|
||||
else:
|
||||
error_regex = ".*failed to get basebackup.*"
|
||||
with pytest.raises((RuntimeError, IoError, UndefinedTable), match=error_regex):
|
||||
with env.endpoints.create_start(
|
||||
"test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1
|
||||
)
|
||||
) as endpoint:
|
||||
endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
else:
|
||||
with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint:
|
||||
sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
|
||||
|
||||
Reference in New Issue
Block a user