Use is_deleted flag in IndexPart to prevent timeline resurrection

Resolves https://github.com/neondatabase/neon/issues/3560
If the flag is set the timeline will be ignored in attach and during
initial loading.

This is the first step in https://github.com/neondatabase/neon/issues/3889
This commit is contained in:
Dmitry Rodionov
2023-03-31 14:52:38 +03:00
parent 02b28ae0b1
commit 9fda377d75
7 changed files with 207 additions and 28 deletions

View File

@@ -712,8 +712,7 @@ impl Tenant {
);
}
// Wait for all the download tasks to complete & collect results.
let mut remote_clients = HashMap::new();
let mut index_parts = HashMap::new();
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
while let Some(result) = part_downloads.join_next().await {
// NB: we already added timeline_id as context to the error
@@ -721,8 +720,7 @@ impl Tenant {
let (timeline_id, client, index_part, remote_metadata) = result?;
debug!("successfully downloaded index part for timeline {timeline_id}");
timeline_ancestors.insert(timeline_id, remote_metadata);
index_parts.insert(timeline_id, index_part);
remote_clients.insert(timeline_id, client);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
// For every timeline, download the metadata file, scan the local directory,
@@ -730,12 +728,21 @@ impl Tenant {
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
if index_part.is_deleted {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
}
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_parts.remove(&timeline_id).unwrap(),
index_part,
remote_metadata,
remote_clients.remove(&timeline_id).unwrap(),
remote_client,
&ctx,
)
.await
@@ -1048,14 +1055,6 @@ impl Tenant {
local_metadata: TimelineMetadata,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
remote_storage.clone(),
@@ -1068,6 +1067,11 @@ impl Tenant {
let remote_startup_data = match &remote_client {
Some(remote_client) => match remote_client.download_index_file().await {
Ok(index_part) => {
if index_part.is_deleted {
info!("is_deleted is set on remote, skipping");
return Ok(());
}
let remote_metadata = index_part.parse_metadata().context("parse_metadata")?;
Some(RemoteStartupData {
index_part,
@@ -1083,6 +1087,14 @@ impl Tenant {
None => None,
};
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
self.timeline_init_and_sync(
timeline_id,
remote_client,
@@ -1373,6 +1385,13 @@ impl Tenant {
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// Make sure we marked timeline as deleted on the remote
// so we wont pick it up next time during attach or pageserver restart
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
remote_client.persist_index_part_with_deleted_flag().await?;
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//

View File

@@ -12,6 +12,7 @@ use std::io::Write;
use anyhow::{bail, ensure, Context};
use serde::{Deserialize, Serialize};
use tracing::info_span;
use utils::bin_ser::SerializeError;
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
@@ -182,7 +183,7 @@ impl TimelineMetadata {
}
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
pub fn to_bytes(&self) -> Result<Vec<u8>, SerializeError> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {

View File

@@ -204,6 +204,7 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
@@ -213,7 +214,7 @@ use std::sync::{Arc, Mutex};
use remote_storage::{DownloadError, GenericRemoteStorage};
use std::ops::DerefMut;
use tokio::runtime::Runtime;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
@@ -615,6 +616,39 @@ impl RemoteTimelineClient {
Ok(())
}
// NOTE: if there were no tasks to call stop we need to call stop by ourselves first
pub(crate) async fn persist_index_part_with_deleted_flag(
self: &Arc<Self>,
) -> anyhow::Result<()> {
self.stop().context("stop")?;
let index_part = {
let mut locked = self.upload_queue.lock().unwrap();
// We must be in stopped state because otherwise
// we can have inprogress index part upload that can overwrite the file
// with missing is_deleted flag that we going to set below
let stopped = match &mut *locked {
UploadQueue::Uninitialized => anyhow::bail!("is not Stopped but Uninitialized"),
UploadQueue::Initialized(_) => anyhow::bail!("is not Stopped but Initialized"),
UploadQueue::Stopped(stopped) => stopped,
};
stopped.index_part.is_deleted = true;
stopped.index_part.clone()
};
upload::upload_index_part(
self.conf,
&self.storage_impl,
self.tenant_id,
self.timeline_id,
&index_part,
)
.await
}
///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
@@ -732,8 +766,10 @@ impl RemoteTimelineClient {
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
info!("upload task cancelled by shutdown request");
if let Err(e) = self.stop() {
error!("got an error when trying to stop remote client: {e}")
}
self.calls_unfinished_metric_end(&task.op);
self.stop();
return;
}
@@ -916,22 +952,32 @@ impl RemoteTimelineClient {
self.metrics.call_end(&file_kind, &op_kind);
}
fn stop(&self) {
fn stop(&self) -> anyhow::Result<()> {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &*guard {
UploadQueue::Uninitialized => panic!(
UploadQueue::Uninitialized => anyhow::bail!(
"callers are responsible for ensuring this is only called on initialized queue"
),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(qi) => {
info!("shutting down upload queue");
// Prepare index part to put into stopped state
let index_part = IndexPart::new(
qi.latest_files.clone(),
qi.last_uploaded_consistent_lsn,
qi.latest_metadata
.to_bytes()
.context("should be able to serialize metadata")?,
);
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
@@ -940,6 +986,7 @@ impl RemoteTimelineClient {
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
last_uploaded_consistent_lsn,
index_part,
}),
);
if let UploadQueue::Initialized(qi) = upload_queue {
@@ -972,6 +1019,7 @@ impl RemoteTimelineClient {
// We're done.
drop(guard);
Ok(())
}
}
}

View File

@@ -40,6 +40,11 @@ impl LayerFileMetadata {
}
}
// Shortcut to skip serializing false in IndexPart below
fn is_false(b: &bool) -> bool {
!b
}
// TODO seems like another part of the remote storage file format
// compatibility issue, see https://github.com/neondatabase/neon/issues/3072
/// In-memory representation of an `index_part.json` file
@@ -55,6 +60,10 @@ pub struct IndexPart {
#[serde(default)]
version: usize,
#[serde(default)]
#[serde(skip_serializing_if = "is_false")]
pub is_deleted: bool,
/// Layer names, which are stored on the remote storage.
///
/// Additional metadata can might exist in `layer_metadata`.
@@ -101,6 +110,7 @@ impl IndexPart {
layer_metadata,
disk_consistent_lsn,
metadata_bytes,
is_deleted: false,
}
}
@@ -156,6 +166,7 @@ mod tests {
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
is_deleted: false,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
@@ -192,6 +203,7 @@ mod tests {
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
is_deleted: false,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();
@@ -236,6 +248,7 @@ mod tests {
0, 0,
]
.to_vec(),
is_deleted: false,
};
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();

View File

@@ -77,6 +77,8 @@ pub(crate) struct UploadQueueInitialized {
pub(crate) struct UploadQueueStopped {
pub(crate) last_uploaded_consistent_lsn: Lsn,
// Index part is needed here so timeline_delete can access it
pub(super) index_part: IndexPart,
}
impl UploadQueue {

View File

@@ -752,4 +752,94 @@ def get_queued_count(
return int(val)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_timeline_resurrection_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
See https://github.com/neondatabase/neon/issues/3560
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_timeline_resurrection",
)
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
with pg.cursor() as cur:
cur.execute("CREATE TABLE f (i integer);")
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
log.info("waiting for checkpoint upload")
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
log.info("upload of checkpoint is done")
new_timeline_id = env.neon_cli.create_branch("new", "main")
new_pg = env.postgres.create_start("new")
with new_pg.cursor() as cur:
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, tenant_id, new_timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
pageserver_http.timeline_checkpoint(tenant_id, new_timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
log.info("waiting for checkpoint upload")
wait_for_upload(client, tenant_id, new_timeline_id, current_lsn)
log.info("upload of checkpoint is done")
# delete new timeline
client.timeline_delete(tenant_id=tenant_id, timeline_id=new_timeline_id)
##### Stop the pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
client.tenant_attach(tenant_id=tenant_id)
def tenant_active():
all_states = client.tenant_list()
[tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id]
assert tenant["state"] == "Active"
wait_until(
number_of_iterations=5,
interval=1,
func=tenant_active,
)
timelines = client.timeline_list(tenant_id=tenant_id)
assert len(timelines) == 1
# TODO Test that we correctly handle GC of files that are stuck in upload queue.

View File

@@ -39,23 +39,18 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
)
timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id)
)
ps_http = env.pageserver.http_client()
with pytest.raises(
PageserverApiException, match="Cannot delete timeline which has child timelines"
) as exc:
timeline_path = (
env.repo_dir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(parent_timeline_id)
)
assert timeline_path.exists()
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
assert not timeline_path.exists()
assert exc.value.status_code == 400
timeline_path = (
@@ -87,3 +82,14 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
)
assert exc.value.status_code == 404
# Check that we didnt pick up the timeline again after restart.
# See https://github.com/neondatabase/neon/issues/3560
env.pageserver.stop(immediate=True)
env.pageserver.start()
with pytest.raises(
PageserverApiException,
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
) as exc:
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)