mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
pageserver: improved handling of concurrent timeline creations on the same ID (#6139)
## Problem Historically, the pageserver used an "uninit mark" file on disk for two purposes: - Track which timeline dirs are incomplete for handling on restart - Avoid trying to create the same timeline twice at the same time. The original purpose of handling restarts is now defunct, as we use remote storage as the source of truth and clean up any trash timeline dirs on startup. Using the file to mutually exclude creation operations is error prone compared with just doing it in memory, and the existing checks happened some way into the creation operation, and could expose errors as 500s (anyhow::Errors) rather than something clean. ## Summary of changes - Creations are now mutually excluded in memory (using `Tenant::timelines_creating`), rather than relying on a file on disk for coordination. - Acquiring unique access to the timeline ID now happens earlier in the request. - Creating the same timeline which already exists is now a 201: this simplifies retry handling for clients. - 409 is still returned if a timeline with the same ID is still being created: if this happens it is probably because the client timed out an earlier request and has retried. - Colliding timeline creation requests should no longer return 500 errors This paves the way to entirely removing uninit markers in a subsequent change. --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
@@ -992,8 +992,8 @@ paths:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
Create a timeline. Returns new timeline id on success.\
|
||||
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
|
||||
Create a timeline. Returns new timeline id on success.
|
||||
Recreating the same timeline will succeed if the parameters match the existing timeline.
|
||||
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
|
||||
requestBody:
|
||||
content:
|
||||
|
||||
@@ -453,7 +453,7 @@ async fn timeline_create_handler(
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AlreadyExists) => {
|
||||
Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
|
||||
json_response(StatusCode::CONFLICT, ())
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
|
||||
|
||||
@@ -48,6 +48,7 @@ use self::mgr::GetActiveTenantError;
|
||||
use self::mgr::GetTenantError;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineExclusionError;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
@@ -87,7 +88,6 @@ use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -249,6 +249,12 @@ pub struct Tenant {
|
||||
generation: Generation,
|
||||
|
||||
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
|
||||
/// During timeline creation, we first insert the TimelineId to the
|
||||
/// creating map, then `timelines`, then remove it from the creating map.
|
||||
/// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
|
||||
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
|
||||
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration
|
||||
@@ -407,8 +413,10 @@ impl Debug for SetStoppingError {
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum CreateTimelineError {
|
||||
#[error("a timeline with the given ID already exists")]
|
||||
AlreadyExists,
|
||||
#[error("creation of timeline with the given ID is in progress")]
|
||||
AlreadyCreating,
|
||||
#[error("timeline already exists with different parameters")]
|
||||
Conflict,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
@@ -1458,7 +1466,7 @@ impl Tenant {
|
||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a writable timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub async fn create_empty_timeline(
|
||||
pub(crate) async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1470,10 +1478,7 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
|
||||
};
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
// make it valid, before calling finish_creation()
|
||||
@@ -1550,7 +1555,7 @@ impl Tenant {
|
||||
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
|
||||
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_timeline(
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
@@ -1571,26 +1576,51 @@ impl Tenant {
|
||||
.enter()
|
||||
.map_err(|_| CreateTimelineError::ShuttingDown)?;
|
||||
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
|
||||
// and that no other creation attempts will be allowed in while we are working. The
|
||||
// uninit_mark is a guard.
|
||||
let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
|
||||
Ok(m) => m,
|
||||
Err(TimelineExclusionError::AlreadyCreating) => {
|
||||
// Creation is in progress, we cannot create it again, and we cannot
|
||||
// check if this request matches the existing one, so caller must try
|
||||
// again later.
|
||||
return Err(CreateTimelineError::AlreadyCreating);
|
||||
}
|
||||
Err(TimelineExclusionError::Other(e)) => {
|
||||
return Err(CreateTimelineError::Other(e));
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists(existing)) => {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
return Err(CreateTimelineError::AlreadyExists);
|
||||
}
|
||||
// Idempotency: creating the same timeline twice is not an error, unless
|
||||
// the second creation has different parameters.
|
||||
if existing.get_ancestor_timeline_id() != ancestor_timeline_id
|
||||
|| existing.pg_version != pg_version
|
||||
|| (ancestor_start_lsn.is_some()
|
||||
&& ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
|
||||
{
|
||||
return Err(CreateTimelineError::Conflict);
|
||||
}
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
}
|
||||
|
||||
return Ok(existing);
|
||||
}
|
||||
};
|
||||
|
||||
let loaded_timeline = match ancestor_timeline_id {
|
||||
Some(ancestor_timeline_id) => {
|
||||
@@ -1627,18 +1657,32 @@ impl Tenant {
|
||||
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
|
||||
}
|
||||
|
||||
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
|
||||
.await?
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
self.bootstrap_timeline(new_timeline_id, pg_version, load_existing_initdb, ctx)
|
||||
.await?
|
||||
self.bootstrap_timeline(
|
||||
new_timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
|
||||
// not send a success to the caller until it is. The same applies to handling retries,
|
||||
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
let kind = ancestor_timeline_id
|
||||
.map(|_| "branched")
|
||||
.unwrap_or("bootstrapped");
|
||||
@@ -2422,6 +2466,7 @@ impl Tenant {
|
||||
loading_started_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines_creating: Mutex::new(HashSet::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
@@ -2813,8 +2858,9 @@ impl Tenant {
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
|
||||
.await?;
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -2828,9 +2874,10 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -2839,13 +2886,14 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
// We will validate our ancestor LSN in this function. Acquire the GC lock so that
|
||||
// this check cannot race with GC, and the ancestor LSN is guaranteed to remain
|
||||
// valid while we are creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
@@ -2855,13 +2903,6 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(dst_id, &timelines)?
|
||||
};
|
||||
|
||||
// Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
|
||||
// horizon on the source timeline
|
||||
//
|
||||
@@ -2953,21 +2994,38 @@ impl Tenant {
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
pub(crate) async fn bootstrap_timeline(
|
||||
/// For unit tests, make this visible so that other modules can directly create timelines
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn bootstrap_timeline_test(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(timeline_id, &timelines)?
|
||||
};
|
||||
let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
|
||||
self.bootstrap_timeline(
|
||||
timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
// temporary directory for basebackup files for the given timeline.
|
||||
|
||||
@@ -3164,11 +3222,11 @@ impl Tenant {
|
||||
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
||||
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
||||
/// uninit mark file.
|
||||
async fn prepare_new_timeline(
|
||||
&self,
|
||||
async fn prepare_new_timeline<'a>(
|
||||
&'a self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
uninit_mark: TimelineUninitMark,
|
||||
uninit_mark: TimelineUninitMark<'a>,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<UninitializedTimeline> {
|
||||
@@ -3241,23 +3299,38 @@ impl Tenant {
|
||||
fn create_timeline_uninit_mark(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) -> anyhow::Result<TimelineUninitMark> {
|
||||
) -> Result<TimelineUninitMark, TimelineExclusionError> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
anyhow::ensure!(
|
||||
timelines.get(&timeline_id).is_none(),
|
||||
"Timeline {tenant_shard_id}/{timeline_id} already exists in pageserver's memory"
|
||||
);
|
||||
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
anyhow::ensure!(
|
||||
!timeline_path.exists(),
|
||||
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
|
||||
);
|
||||
|
||||
let uninit_mark_path = self
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
|
||||
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let uninit_mark = TimelineUninitMark::new(
|
||||
self,
|
||||
timeline_id,
|
||||
uninit_mark_path.clone(),
|
||||
timeline_path.clone(),
|
||||
)?;
|
||||
|
||||
// At this stage, we have got exclusive access to in-memory state for this timeline ID
|
||||
// for creation.
|
||||
// A timeline directory should never exist on disk already:
|
||||
// - a previous failed creation would have cleaned up after itself
|
||||
// - a pageserver restart would clean up timeline directories that don't have valid remote state
|
||||
//
|
||||
// Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
|
||||
// this error may indicate a bug in cleanup on failed creations.
|
||||
if timeline_path.exists() {
|
||||
return Err(TimelineExclusionError::Other(anyhow::anyhow!(
|
||||
"Timeline directory already exists! This is a bug."
|
||||
)));
|
||||
}
|
||||
|
||||
// Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
|
||||
// that during process runtime, colliding creations will be caught in-memory without getting
|
||||
// as far as failing to write a file.
|
||||
fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
@@ -3271,8 +3344,6 @@ impl Tenant {
|
||||
format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
|
||||
|
||||
Ok(uninit_mark)
|
||||
}
|
||||
|
||||
@@ -4022,13 +4093,7 @@ mod tests {
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
format!(
|
||||
"Timeline {}/{} already exists in pageserver's memory",
|
||||
tenant.tenant_shard_id, TIMELINE_ID
|
||||
)
|
||||
),
|
||||
Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -446,6 +446,12 @@ pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Timeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "Timeline<{}>", self.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
|
||||
@@ -19,14 +19,14 @@ use super::Timeline;
|
||||
pub struct UninitializedTimeline<'t> {
|
||||
pub(crate) owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
|
||||
}
|
||||
|
||||
impl<'t> UninitializedTimeline<'t> {
|
||||
pub(crate) fn new(
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
owning_tenant,
|
||||
@@ -169,18 +169,55 @@ pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
///
|
||||
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
|
||||
#[must_use]
|
||||
pub(crate) struct TimelineUninitMark {
|
||||
pub(crate) struct TimelineUninitMark<'t> {
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
uninit_mark_deleted: bool,
|
||||
uninit_mark_path: Utf8PathBuf,
|
||||
pub(crate) timeline_path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
/// Errors when acquiring exclusive access to a timeline ID for creation
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum TimelineExclusionError {
|
||||
#[error("Already exists")]
|
||||
AlreadyExists(Arc<Timeline>),
|
||||
#[error("Already creating")]
|
||||
AlreadyCreating,
|
||||
|
||||
// e.g. I/O errors, or some failure deep in postgres initdb
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl<'t> TimelineUninitMark<'t> {
|
||||
pub(crate) fn new(
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
uninit_mark_path: Utf8PathBuf,
|
||||
timeline_path: Utf8PathBuf,
|
||||
) -> Result<Self, TimelineExclusionError> {
|
||||
// Lock order: this is the only place we take both locks. During drop() we only
|
||||
// lock creating_timelines
|
||||
let timelines = owning_tenant.timelines.lock().unwrap();
|
||||
let mut creating_timelines: std::sync::MutexGuard<
|
||||
'_,
|
||||
std::collections::HashSet<TimelineId>,
|
||||
> = owning_tenant.timelines_creating.lock().unwrap();
|
||||
|
||||
if let Some(existing) = timelines.get(&timeline_id) {
|
||||
Err(TimelineExclusionError::AlreadyExists(existing.clone()))
|
||||
} else if creating_timelines.contains(&timeline_id) {
|
||||
Err(TimelineExclusionError::AlreadyCreating)
|
||||
} else {
|
||||
creating_timelines.insert(timeline_id);
|
||||
Ok(Self {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +244,7 @@ impl TimelineUninitMark {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineUninitMark {
|
||||
impl Drop for TimelineUninitMark<'_> {
|
||||
fn drop(&mut self) {
|
||||
if !self.uninit_mark_deleted {
|
||||
if self.timeline_path.exists() {
|
||||
@@ -226,5 +263,11 @@ impl Drop for TimelineUninitMark {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.owning_tenant
|
||||
.timelines_creating
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(&self.timeline_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2191,7 +2191,7 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.bootstrap_timeline_test(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -239,92 +238,6 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
env.pageserver.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# there's multiple valid status codes:
|
||||
# - Timeline x/y already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Check that a timeline is deleted locally on subsequent restart if it never successfully uploaded during creation.
|
||||
|
||||
Reference in New Issue
Block a user