finish WIP: keep the real timeline from create_empty_timeline outside of timelines map until it has finished filling

This commit is contained in:
Christian Schwarz
2023-05-26 15:29:19 +02:00
parent 3c1fc2617c
commit 4680f8c60b
3 changed files with 206 additions and 117 deletions

View File

@@ -14,6 +14,7 @@ use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::TimelineState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -24,6 +25,7 @@ use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, Qu
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::collections::hash_map::Entry;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
@@ -51,6 +53,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::compare_arced_timeline;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
@@ -491,10 +494,14 @@ impl PageServerHandler {
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let (guard, timeline) = tenant
let (guard, real_timeline_not_in_tenants_map) = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO spawn flush loop of timeline early (before activation),
// but then we need to take care of shutting it down in case we fail
// (bootstrap_timeline probably also needs it?)
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
@@ -512,7 +519,7 @@ impl PageServerHandler {
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
real_timeline_not_in_tenants_map
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
@@ -520,14 +527,10 @@ impl PageServerHandler {
read_tar_eof(copyin_reader).await?;
anyhow::Ok(())
};
match doit.await {
let placeholder_timeline = match doit.await {
Ok(()) => {
match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() {
Ok(placeholder_timeline) => {
// create_empty_timeline already replaced the placeholder timeline with the real one.
// However, we still need to remove the placeholder.
let _ = placeholder_timeline; // don't need it anymore
}
Ok(placeholder_timeline) => placeholder_timeline,
Err(err) => {
error!(
"failed to remove uninit marker for new_timeline_id={timeline_id}: {err:#}"
@@ -538,10 +541,21 @@ impl PageServerHandler {
}
Err(e) => {
debug_assert_current_span_has_tenant_and_timeline_id();
error!("error importing basebackup: {:?}", e);
guard.creation_failed();
return Err(QueryError::Other(e));
}
};
// todo share with Tenant::create_timeline
match tenant.timelines.lock().unwrap().entry(timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(mut o) => {
info!("replacing placeholder timeline with the real one");
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
let replaced_placeholder = o.insert(Arc::clone(&real_timeline_not_in_tenants_map));
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
},
}
// TODO check checksum
@@ -551,7 +565,7 @@ impl PageServerHandler {
// since we discard some log files.
info!("done, activating timeline");
timeline.activate(self.broker_client.clone(), &ctx);
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx);
Ok(())
}

View File

@@ -136,7 +136,7 @@ pub struct Tenant {
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_id: TenantId,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
pub(super) timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration
@@ -158,7 +158,7 @@ pub struct Tenant {
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
#[inline(always)]
fn compare_arced_timeline(left: &Arc<Timeline>, right: &Arc<Timeline>) -> bool {
pub(crate) fn compare_arced_timeline(left: &Arc<Timeline>, right: &Arc<Timeline>) -> bool {
// See: https://github.com/rust-lang/rust/issues/103763
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
@@ -307,6 +307,18 @@ impl<'t> CreatingTimelineGuard<'t> {
}
}
/// Newtype to avoid conusing local variables that are both Arc<Timelien>
struct AncestorArg(Option<Arc<Timeline>>);
impl AncestorArg {
pub fn ancestor(ancestor: Arc<Timeline>) -> Self {
Self(Some(ancestor))
}
pub fn no_ancestor() -> Self {
Self(None)
}
}
// We should not blindly overwrite local metadata with remote one.
// For example, consider the following case:
// Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
@@ -455,18 +467,49 @@ impl Tenant {
/// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
/// it is marked as Active.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(?cause))]
async fn timeline_init_and_sync(
&self,
timeline_id: TimelineId,
remote_client: Option<Arc<RemoteTimelineClient>>,
remote_startup_data: Option<RemoteStartupData>,
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
ancestor: AncestorArg,
cause: TimelineLoadCause,
first_save: bool, // TODO need to think about this
_ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_id = self.tenant_id;
let ancestor = ancestor.0;
match (
local_metadata.as_ref().map(|lmd| lmd.ancestor_timeline()),
remote_startup_data
.as_ref()
.map(|rsd| rsd.remote_metadata.ancestor_timeline()),
) {
(Some(local_ancestor), Some(remote_ancestor)) => {
assert_eq!(
local_ancestor, remote_ancestor,
"local and remote ancestor timelines should match"
);
}
(None, None) => {}
(local, remote) => {
anyhow::bail!("local and remote metadata do not agree on ancestorship, local={local:?} remote={remote:?}");
}
}
assert_eq!(
ancestor.as_ref().map(|a| a.timeline_id),
// we could check either local or remote metadata, it doesn't matter,
// we checked above that they're either (None, None) or (Some, Some)
local_metadata
.as_ref()
.and_then(|lmd| lmd.ancestor_timeline()),
"caller does not provide correct ancestor"
);
let (up_to_date_metadata, picked_local) = merge_local_remote_metadata(
local_metadata.as_ref(),
@@ -475,6 +518,12 @@ impl Tenant {
.context("merge_local_remote_metadata")?
.to_owned();
assert_eq!(
up_to_date_metadata.ancestor_timeline(),
ancestor.as_ref().map(|a| a.timeline_id),
"merge_local_remote_metadata should not change ancestor"
);
let timeline = {
let timeline = self.create_timeline_data(
timeline_id,
@@ -492,31 +541,7 @@ impl Tenant {
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
})?;
// avoiding holding it across awaits
let mut timelines_accessor = self.timelines.lock().unwrap();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(mut e) => {
match cause {
TimelineLoadCause::TenantCreate => unreachable!("tenant creates no timelines, so, we don't reach here"),
TimelineLoadCause::Startup |
TimelineLoadCause::TenantLoad |
TimelineLoadCause::Attach => unreachable!("when loading a full tenant, the loading entity is responsible for ensuring there are no duplicates, cause={cause:?}"),
TimelineLoadCause::TimelineCreate { placeholder_timeline } => {
// replace placeholder with real one
assert!(compare_arced_timeline(e.get(), &placeholder_timeline), "when creating a timeline, the placeholder timeline should be the one in the map");
e.insert(Arc::clone(&timeline));
timeline
},
#[cfg(test)]
TimelineLoadCause::Test => todo!(),
}
}
Entry::Vacant(v) => {
v.insert(Arc::clone(&timeline));
timeline.maybe_spawn_flush_loop();
timeline
}
}
timeline
};
if self.remote_storage.is_some() {
@@ -724,7 +749,27 @@ impl Tenant {
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
AncestorArg::ancestor(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
anyhow::anyhow!(
"cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
)
},
)?))
} else {
AncestorArg::no_ancestor()
};
let timeline = self
.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
ancestor,
remote_client,
ctx,
)
.await
.with_context(|| {
format!(
@@ -732,6 +777,16 @@ impl Tenant {
timeline_id, self.tenant_id
)
})?;
// TODO: why can't load_remote_timeline return None like load_local_timeline does?
let mut timelines = self.timelines.lock().unwrap();
let overwritten = timelines.insert(timeline_id, Arc::clone(&timeline));
if let Some(overwritten) = overwritten {
panic!(
"timeline should not be in the map yet, but is: {timeline_id}: {:?}",
overwritten.current_state()
);
}
}
std::fs::remove_file(&marker_file)
@@ -768,9 +823,10 @@ impl Tenant {
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
ancestor: AncestorArg,
remote_client: RemoteTimelineClient,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<Arc<Timeline>> {
debug_assert_current_span_has_tenant_id();
info!("downloading index file for timeline {}", timeline_id);
@@ -778,19 +834,6 @@ impl Tenant {
.await
.context("Failed to create new timeline directory")?;
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
anyhow::anyhow!(
"cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
)
},
)?))
} else {
None
};
// Even if there is local metadata it cannot be ahead of the remote one
// since we're attaching. Even if we resume interrupted attach remote one
// cannot be older than the local one
@@ -809,8 +852,7 @@ impl Tenant {
true,
ctx,
)
.await?;
Ok(())
.await
}
/// Create a placeholder Tenant object for a broken tenant
@@ -1021,9 +1063,38 @@ impl Tenant {
// 1. "Timeline has no ancestor and no layer files"
for (timeline_id, local_metadata) in sorted_timelines {
self.load_local_timeline(timeline_id, local_metadata, cause.clone(), ctx)
let ancestor = if let Some(ancestor_id) = local_metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
AncestorArg::ancestor(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
anyhow::anyhow!(
"cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
)
},
)?))
} else {
AncestorArg::no_ancestor()
};
let timeline = self
.load_local_timeline(timeline_id, local_metadata, ancestor, cause.clone(), ctx)
.await
.with_context(|| format!("load local timeline {timeline_id}"))?;
match timeline {
Some(loaded_timeline) => {
let mut timelines = self.timelines.lock().unwrap();
let overwritten = timelines.insert(timeline_id, Arc::clone(&loaded_timeline));
if let Some(overwritten) = overwritten {
panic!(
"timeline should not be in the map yet, but is: {timeline_id}: {:?}",
overwritten.current_state()
);
}
}
None => {
info!(%timeline_id, "timeline is marked as deleted on the remote, load_local_timeline finished the deletion locally");
// TODO don't we need to restart the tree sort?
}
}
}
info!("Done");
@@ -1039,6 +1110,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
ancestor: AncestorArg,
cause: TimelineLoadCause,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Timeline>>> {
@@ -1094,14 +1166,6 @@ impl Tenant {
None => None,
};
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
Some(ancestor_timeline)
} else {
None
};
let inserted_timeline = self
.timeline_init_and_sync(
timeline_id,
@@ -1241,6 +1305,7 @@ impl Tenant {
.load_local_timeline(
new_timeline_id,
metadata,
AncestorArg::no_ancestor(),
TimelineLoadCause::TimelineCreate {
placeholder_timeline: Arc::clone(&guard.placeholder_timeline),
},
@@ -1250,16 +1315,20 @@ impl Tenant {
.context("load newly created on-disk timeline state")?
.expect("load_local_timeline should have created the timeline");
let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(entry) => {
assert_eq!(guard.placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&real_timeline, entry.get()));
assert_eq!(real_timeline.current_state(), TimelineState::Loading);
assert!(!compare_arced_timeline(&guard.placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline");
Arc::clone(entry.get())
}
};
// don't replace the placeholder timeline, the caller is going to fill
// real_timeline with more data and once that's done, we're ready to
// replace the placeholder
// let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) {
// Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
// Entry::Occupied(entry) => {
// assert_eq!(guard.placeholder_timeline.current_state(), TimelineState::Creating);
// assert!(compare_arced_timeline(&real_timeline, entry.get()));
// assert_eq!(real_timeline.current_state(), TimelineState::Loading);
// assert!(!compare_arced_timeline(&guard.placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline");
// Arc::clone(entry.get())
// }
// };
// Do not activate, the caller is responsible for that.
// Also, the caller is still responsible for removing the uninit mark file.
@@ -1422,6 +1491,7 @@ impl Tenant {
ctx,
)
.await?;
Ok(AncestorArg::ancestor(ancestor_timeline))
}
None => {
self.bootstrap_timeline(
@@ -1432,17 +1502,16 @@ impl Tenant {
ctx,
)
.await?;
Ok(AncestorArg::no_ancestor())
}
};
}
// XXX do we need to remove uninit mark before the self.branch_timeline / self.bootstrap_timeline start the uploads?
// If we die with uninit mark present, we'll leak the uploaded state in S3.
Ok(())
};
let placeholder_timeline = match create_ondisk_state.await {
Ok(()) => {
let (placeholder_timeline, ancestor) = match create_ondisk_state.await {
Ok(ancestor) => {
match guard.creation_complete_remove_uninit_marker_and_get_placeholder_timeline() {
Ok(placeholder_timeline) => placeholder_timeline,
Ok(placeholder_timeline) => (placeholder_timeline, ancestor),
Err(err) => {
error!(
"failed to remove uninit marker for new_timeline_id={new_timeline_id}: {err:#}"
@@ -1463,26 +1532,35 @@ impl Tenant {
// From here on, it's just like during pageserver startup.
let metadata = load_metadata(self.conf, new_timeline_id, self.tenant_id)
.context("load newly created on-disk timeline metadata")?;
self.load_local_timeline(
new_timeline_id,
metadata,
TimelineLoadCause::TimelineCreate {
placeholder_timeline: Arc::clone(&placeholder_timeline),
},
ctx,
)
.await
.context("load newly created on-disk timeline state")?;
let real_timeline = match self.timelines.lock().unwrap().entry(new_timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(entry) => {
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(!compare_arced_timeline(&placeholder_timeline, entry.get()), "load_local_timeline should have replaced the placeholder with the real timeline");
Arc::clone(entry.get())
}
let real_timeline = self
.load_local_timeline(
new_timeline_id,
metadata,
ancestor,
TimelineLoadCause::TimelineCreate {
placeholder_timeline: Arc::clone(&placeholder_timeline),
},
ctx,
)
.await
.context("load newly created on-disk timeline state")?;
let Some(real_timeline) = real_timeline else {
anyhow::bail!("we just created this timeline's local files, but load_local_timeline did not load it");
};
match self.timelines.lock().unwrap().entry(new_timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(mut o) => {
info!("replacing placeholder timeline with the real one");
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
let replaced_placeholder = o.insert(Arc::clone(&real_timeline));
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
},
}
real_timeline.activate(broker_client, ctx);
Ok(Some(real_timeline))
@@ -2658,19 +2736,21 @@ impl Tenant {
// From here on, it's just like during pageserver startup.
let metadata = load_metadata(self.conf, dst_id, self.tenant_id)
.context("load newly created on-disk timeline metadata")?;
self.load_local_timeline(dst_id, metadata, TimelineLoadCause::Test, ctx)
let tline = self
.load_local_timeline(
dst_id,
metadata,
AncestorArg::no_ancestor(),
TimelineLoadCause::Test,
ctx,
)
.instrument(info_span!("load_local_timeline", timeline_id=%dst_id))
.await
.context("load newly created on-disk timeline state")?;
.context("load newly created on-disk timeline state")?
.unwrap();
let loaded_timeline = self
.timelines
.lock()
.unwrap()
.get(&dst_id)
.cloned()
.expect("we just loaded it");
loaded_timeline.set_state(TimelineState::Active);
tline.set_state(TimelineState::Active);
Ok(loaded_timeline)
}

View File

@@ -128,29 +128,24 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
)
# Importing empty file fails
log.info("importing empty_file")
empty_file = os.path.join(test_output_dir, "empty_file")
with open(empty_file, "w") as _:
with pytest.raises(Exception):
import_tar(empty_file, empty_file)
# yet, timeline is created and needs to be removed
log.info("deleting timeline")
client.timeline_delete(tenant, timeline)
log.info("importing corrupt_base_tar")
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
# Importing corrupt backup fails
log.info("importing corrupt_base_tar")
with pytest.raises(Exception):
import_tar(corrupt_base_tar, wal_tar)
# yet, timeline is created and needs to be removed
log.info("deleting timeline")
client.timeline_delete(tenant, timeline)
log.info("importing base_plus_garbage_tar")
assert timeline not in {TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)}
# A tar with trailing garbage is currently accepted. It prints a warnings
# to the pageserver log, however. Check that.
log.info("importing base_plus_garbage_tar")
import_tar(base_plus_garbage_tar, wal_tar)
assert env.pageserver.log_contains(
".*WARN.*ignored .* unexpected bytes after the tar archive.*"