mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 16:10:37 +00:00
use separate enum variant to represent index part for deleted timeline
This commit is contained in:
@@ -58,6 +58,7 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
use crate::tenant::remote_timeline_client::index::IndexPart;
|
||||
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
|
||||
use crate::tenant::storage_layer::DeltaLayer;
|
||||
use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
@@ -694,16 +695,9 @@ impl Tenant {
|
||||
.await
|
||||
.context("download index file")?;
|
||||
|
||||
let remote_metadata = index_part.parse_metadata().context("parse metadata")?;
|
||||
|
||||
debug!("finished index part download");
|
||||
|
||||
Result::<_, anyhow::Error>::Ok((
|
||||
timeline_id,
|
||||
client,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
))
|
||||
Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
|
||||
}
|
||||
.map(move |res| {
|
||||
res.with_context(|| format!("download index part for timeline {timeline_id}"))
|
||||
@@ -717,10 +711,21 @@ impl Tenant {
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
let (timeline_id, client, index_part, remote_metadata) = result?;
|
||||
let (timeline_id, client, index_part) = result?;
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
timeline_ancestors.insert(timeline_id, remote_metadata);
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
timeline_ancestors.insert(
|
||||
timeline_id,
|
||||
index_part.parse_metadata().context("parse_metadata")?,
|
||||
);
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted => {
|
||||
info!("timeline {} is deleted, skipping", timeline_id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
@@ -732,11 +737,6 @@ impl Tenant {
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
if index_part.is_deleted {
|
||||
info!("timeline {} is deleted, skipping", timeline_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
@@ -1048,7 +1048,6 @@ impl Tenant {
|
||||
/// Subroutine of `load_tenant`, to load an individual timeline
|
||||
///
|
||||
/// NB: The parent is assumed to be already loaded!
|
||||
#[instrument(skip(self, local_metadata, ctx), fields(timeline_id=%timeline_id))]
|
||||
async fn load_local_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
@@ -1067,10 +1066,13 @@ impl Tenant {
|
||||
let remote_startup_data = match &remote_client {
|
||||
Some(remote_client) => match remote_client.download_index_file().await {
|
||||
Ok(index_part) => {
|
||||
if index_part.is_deleted {
|
||||
info!("is_deleted is set on remote, skipping");
|
||||
return Ok(());
|
||||
}
|
||||
let index_part = match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted => {
|
||||
info!("is_deleted is set on remote, skipping");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let remote_metadata = index_part.parse_metadata().context("parse_metadata")?;
|
||||
Some(RemoteStartupData {
|
||||
|
||||
@@ -253,6 +253,11 @@ const FAILED_DOWNLOAD_RETRIES: u32 = 10;
|
||||
// retries. Uploads and deletions are retried forever, though.
|
||||
const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub enum MaybeDeletedIndexPart {
|
||||
IndexPart(IndexPart),
|
||||
Deleted,
|
||||
}
|
||||
|
||||
/// A client for accessing a timeline's data in remote storage.
|
||||
///
|
||||
/// This takes care of managing the number of connections, and balancing them
|
||||
@@ -367,12 +372,12 @@ impl RemoteTimelineClient {
|
||||
//
|
||||
|
||||
/// Download index file
|
||||
pub async fn download_index_file(&self) -> Result<IndexPart, DownloadError> {
|
||||
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||
let _unfinished_gauge_guard = self
|
||||
.metrics
|
||||
.call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download);
|
||||
|
||||
download::download_index_part(
|
||||
let index_part = download::download_index_part(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_id,
|
||||
@@ -385,7 +390,13 @@ impl RemoteTimelineClient {
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
if index_part.is_deleted {
|
||||
Ok(MaybeDeletedIndexPart::Deleted)
|
||||
} else {
|
||||
Ok(MaybeDeletedIndexPart::IndexPart(index_part))
|
||||
}
|
||||
}
|
||||
|
||||
/// Download a (layer) file from `path`, into local filesystem.
|
||||
@@ -1209,7 +1220,11 @@ mod tests {
|
||||
}
|
||||
|
||||
// Download back the index.json, and check that the list of files is correct
|
||||
let index_part = runtime.block_on(client.download_index_file())?;
|
||||
let index_part = match runtime.block_on(client.download_index_file())? {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted => panic!("unexpectedly got deleted index part"),
|
||||
};
|
||||
|
||||
assert_file_list(
|
||||
&index_part.timeline_layers,
|
||||
&[
|
||||
|
||||
Reference in New Issue
Block a user