From 9fda377d7585326a96193634f73d3af9f75bd841 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Fri, 31 Mar 2023 14:52:38 +0300 Subject: [PATCH] Use is_deleted flag in IndexPart to prevent timeline resurrection Resolves https://github.com/neondatabase/neon/issues/3560 If the flag is set the timeline will be ignored in attach and during initial loading. This is the first step in https://github.com/neondatabase/neon/issues/3889 --- pageserver/src/tenant.rs | 47 +++++++--- pageserver/src/tenant/metadata.rs | 3 +- .../src/tenant/remote_timeline_client.rs | 56 +++++++++++- .../tenant/remote_timeline_client/index.rs | 13 +++ pageserver/src/tenant/upload_queue.rs | 2 + test_runner/regress/test_remote_storage.py | 90 +++++++++++++++++++ test_runner/regress/test_timeline_delete.py | 24 +++-- 7 files changed, 207 insertions(+), 28 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 11415b47c4..f0d646c392 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -712,8 +712,7 @@ impl Tenant { ); } // Wait for all the download tasks to complete & collect results. - let mut remote_clients = HashMap::new(); - let mut index_parts = HashMap::new(); + let mut remote_index_and_client = HashMap::new(); let mut timeline_ancestors = HashMap::new(); while let Some(result) = part_downloads.join_next().await { // NB: we already added timeline_id as context to the error @@ -721,8 +720,7 @@ impl Tenant { let (timeline_id, client, index_part, remote_metadata) = result?; debug!("successfully downloaded index part for timeline {timeline_id}"); timeline_ancestors.insert(timeline_id, remote_metadata); - index_parts.insert(timeline_id, index_part); - remote_clients.insert(timeline_id, client); + remote_index_and_client.insert(timeline_id, (index_part, client)); } // For every timeline, download the metadata file, scan the local directory, @@ -730,12 +728,21 @@ impl Tenant { // layer file. let sorted_timelines = tree_sort_timelines(timeline_ancestors)?; for (timeline_id, remote_metadata) in sorted_timelines { + let (index_part, remote_client) = remote_index_and_client + .remove(&timeline_id) + .expect("just put it in above"); + + if index_part.is_deleted { + info!("timeline {} is deleted, skipping", timeline_id); + continue; + } + // TODO again handle early failure self.load_remote_timeline( timeline_id, - index_parts.remove(&timeline_id).unwrap(), + index_part, remote_metadata, - remote_clients.remove(&timeline_id).unwrap(), + remote_client, &ctx, ) .await @@ -1048,14 +1055,6 @@ impl Tenant { local_metadata: TimelineMetadata, ctx: &RequestContext, ) -> anyhow::Result<()> { - let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { - let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) - .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; - Some(ancestor_timeline) - } else { - None - }; - let remote_client = self.remote_storage.as_ref().map(|remote_storage| { RemoteTimelineClient::new( remote_storage.clone(), @@ -1068,6 +1067,11 @@ impl Tenant { let remote_startup_data = match &remote_client { Some(remote_client) => match remote_client.download_index_file().await { Ok(index_part) => { + if index_part.is_deleted { + info!("is_deleted is set on remote, skipping"); + return Ok(()); + } + let remote_metadata = index_part.parse_metadata().context("parse_metadata")?; Some(RemoteStartupData { index_part, @@ -1083,6 +1087,14 @@ impl Tenant { None => None, }; + let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() { + let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false) + .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?; + Some(ancestor_timeline) + } else { + None + }; + self.timeline_init_and_sync( timeline_id, remote_client, @@ -1373,6 +1385,13 @@ impl Tenant { info!("waiting for timeline tasks to shutdown"); task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await; + // Make sure we marked timeline as deleted on the remote + // so we wont pick it up next time during attach or pageserver restart + // See comment in persist_index_part_with_deleted_flag. + if let Some(remote_client) = timeline.remote_client.as_ref() { + remote_client.persist_index_part_with_deleted_flag().await?; + } + { // Grab the layer_removal_cs lock, and actually perform the deletion. // diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 297cccbe30..1ea61fa26b 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -12,6 +12,7 @@ use std::io::Write; use anyhow::{bail, ensure, Context}; use serde::{Deserialize, Serialize}; use tracing::info_span; +use utils::bin_ser::SerializeError; use utils::{ bin_ser::BeSer, id::{TenantId, TimelineId}, @@ -182,7 +183,7 @@ impl TimelineMetadata { } } - pub fn to_bytes(&self) -> anyhow::Result> { + pub fn to_bytes(&self) -> Result, SerializeError> { let body_bytes = self.body.ser()?; let metadata_size = METADATA_HDR_SIZE + body_bytes.len(); let hdr = TimelineMetadataHeader { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 28c4943dbd..9f45cfe048 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -204,6 +204,7 @@ mod download; pub mod index; mod upload; +use anyhow::Context; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; @@ -213,7 +214,7 @@ use std::sync::{Arc, Mutex}; use remote_storage::{DownloadError, GenericRemoteStorage}; use std::ops::DerefMut; use tokio::runtime::Runtime; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use tracing::{info_span, Instrument}; use utils::lsn::Lsn; @@ -615,6 +616,39 @@ impl RemoteTimelineClient { Ok(()) } + // NOTE: if there were no tasks to call stop we need to call stop by ourselves first + pub(crate) async fn persist_index_part_with_deleted_flag( + self: &Arc, + ) -> anyhow::Result<()> { + self.stop().context("stop")?; + + let index_part = { + let mut locked = self.upload_queue.lock().unwrap(); + + // We must be in stopped state because otherwise + // we can have inprogress index part upload that can overwrite the file + // with missing is_deleted flag that we going to set below + let stopped = match &mut *locked { + UploadQueue::Uninitialized => anyhow::bail!("is not Stopped but Uninitialized"), + UploadQueue::Initialized(_) => anyhow::bail!("is not Stopped but Initialized"), + UploadQueue::Stopped(stopped) => stopped, + }; + + stopped.index_part.is_deleted = true; + + stopped.index_part.clone() + }; + + upload::upload_index_part( + self.conf, + &self.storage_impl, + self.tenant_id, + self.timeline_id, + &index_part, + ) + .await + } + /// /// Pick next tasks from the queue, and start as many of them as possible without violating /// the ordering constraints. @@ -732,8 +766,10 @@ impl RemoteTimelineClient { // upload finishes or times out soon enough. if task_mgr::is_shutdown_requested() { info!("upload task cancelled by shutdown request"); + if let Err(e) = self.stop() { + error!("got an error when trying to stop remote client: {e}") + } self.calls_unfinished_metric_end(&task.op); - self.stop(); return; } @@ -916,22 +952,32 @@ impl RemoteTimelineClient { self.metrics.call_end(&file_kind, &op_kind); } - fn stop(&self) { + fn stop(&self) -> anyhow::Result<()> { // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); match &*guard { - UploadQueue::Uninitialized => panic!( + UploadQueue::Uninitialized => anyhow::bail!( "callers are responsible for ensuring this is only called on initialized queue" ), UploadQueue::Stopped(_) => { // nothing to do info!("another concurrent task already shut down the queue"); + Ok(()) } UploadQueue::Initialized(qi) => { info!("shutting down upload queue"); + // Prepare index part to put into stopped state + let index_part = IndexPart::new( + qi.latest_files.clone(), + qi.last_uploaded_consistent_lsn, + qi.latest_metadata + .to_bytes() + .context("should be able to serialize metadata")?, + ); + // Replace the queue with the Stopped state, taking ownership of the old // Initialized queue. We will do some checks on it, and then drop it. let qi = { @@ -940,6 +986,7 @@ impl RemoteTimelineClient { &mut *guard, UploadQueue::Stopped(UploadQueueStopped { last_uploaded_consistent_lsn, + index_part, }), ); if let UploadQueue::Initialized(qi) = upload_queue { @@ -972,6 +1019,7 @@ impl RemoteTimelineClient { // We're done. drop(guard); + Ok(()) } } } diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 9c84f8e977..bb287c51ab 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -40,6 +40,11 @@ impl LayerFileMetadata { } } +// Shortcut to skip serializing false in IndexPart below +fn is_false(b: &bool) -> bool { + !b +} + // TODO seems like another part of the remote storage file format // compatibility issue, see https://github.com/neondatabase/neon/issues/3072 /// In-memory representation of an `index_part.json` file @@ -55,6 +60,10 @@ pub struct IndexPart { #[serde(default)] version: usize, + #[serde(default)] + #[serde(skip_serializing_if = "is_false")] + pub is_deleted: bool, + /// Layer names, which are stored on the remote storage. /// /// Additional metadata can might exist in `layer_metadata`. @@ -101,6 +110,7 @@ impl IndexPart { layer_metadata, disk_consistent_lsn, metadata_bytes, + is_deleted: false, } } @@ -156,6 +166,7 @@ mod tests { ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), + is_deleted: false, }; let part = serde_json::from_str::(example).unwrap(); @@ -192,6 +203,7 @@ mod tests { ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), + is_deleted: false, }; let part = serde_json::from_str::(example).unwrap(); @@ -236,6 +248,7 @@ mod tests { 0, 0, ] .to_vec(), + is_deleted: false, }; let empty_layers_parsed = serde_json::from_str::(empty_layers_json).unwrap(); diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 08bc1f219d..bb313be2f4 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -77,6 +77,8 @@ pub(crate) struct UploadQueueInitialized { pub(crate) struct UploadQueueStopped { pub(crate) last_uploaded_consistent_lsn: Lsn, + // Index part is needed here so timeline_delete can access it + pub(super) index_part: IndexPart, } impl UploadQueue { diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 6de5f7db04..c853c0a40b 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -752,4 +752,94 @@ def get_queued_count( return int(val) +@pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +def test_timeline_resurrection_on_attach( + neon_env_builder: NeonEnvBuilder, + remote_storage_kind: RemoteStorageKind, +): + """ + See https://github.com/neondatabase/neon/issues/3560 + """ + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_timeline_resurrection", + ) + + ##### First start, insert data and upload it to the remote storage + env = neon_env_builder.init_start() + + pageserver_http = env.pageserver.http_client() + pg = env.postgres.create_start("main") + + client = env.pageserver.http_client() + + tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0]) + + with pg.cursor() as cur: + cur.execute("CREATE TABLE f (i integer);") + cur.execute("INSERT INTO f VALUES (generate_series(1,1000));") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + # wait until pageserver successfully uploaded a checkpoint to remote storage + log.info("waiting for checkpoint upload") + wait_for_upload(client, tenant_id, timeline_id, current_lsn) + log.info("upload of checkpoint is done") + + new_timeline_id = env.neon_cli.create_branch("new", "main") + new_pg = env.postgres.create_start("new") + + with new_pg.cursor() as cur: + cur.execute("INSERT INTO f VALUES (generate_series(1,1000));") + current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # wait until pageserver receives that data + wait_for_last_record_lsn(client, tenant_id, new_timeline_id, current_lsn) + + # run checkpoint manually to be sure that data landed in remote storage + pageserver_http.timeline_checkpoint(tenant_id, new_timeline_id) + + # wait until pageserver successfully uploaded a checkpoint to remote storage + log.info("waiting for checkpoint upload") + wait_for_upload(client, tenant_id, new_timeline_id, current_lsn) + log.info("upload of checkpoint is done") + + # delete new timeline + client.timeline_delete(tenant_id=tenant_id, timeline_id=new_timeline_id) + + ##### Stop the pageserver instance, erase all its data + env.postgres.stop_all() + env.pageserver.stop() + + dir_to_clear = Path(env.repo_dir) / "tenants" + shutil.rmtree(dir_to_clear) + os.mkdir(dir_to_clear) + + ##### Second start, restore the data and ensure it's the same + env.pageserver.start() + + client.tenant_attach(tenant_id=tenant_id) + + def tenant_active(): + all_states = client.tenant_list() + [tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id] + assert tenant["state"] == "Active" + + wait_until( + number_of_iterations=5, + interval=1, + func=tenant_active, + ) + + timelines = client.timeline_list(tenant_id=tenant_id) + assert len(timelines) == 1 + + # TODO Test that we correctly handle GC of files that are stuck in upload queue. diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index cf607f4f7b..7912f3e327 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -39,23 +39,18 @@ def test_timeline_delete(neon_simple_env: NeonEnv): "test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent" ) + timeline_path = ( + env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id) + ) + ps_http = env.pageserver.http_client() with pytest.raises( PageserverApiException, match="Cannot delete timeline which has child timelines" ) as exc: - timeline_path = ( - env.repo_dir - / "tenants" - / str(env.initial_tenant) - / "timelines" - / str(parent_timeline_id) - ) assert timeline_path.exists() ps_http.timeline_delete(env.initial_tenant, parent_timeline_id) - assert not timeline_path.exists() - assert exc.value.status_code == 400 timeline_path = ( @@ -87,3 +82,14 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ) assert exc.value.status_code == 404 + + # Check that we didnt pick up the timeline again after restart. + # See https://github.com/neondatabase/neon/issues/3560 + env.pageserver.stop(immediate=True) + env.pageserver.start() + + with pytest.raises( + PageserverApiException, + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", + ) as exc: + ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)