mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
make Tenant::timelines a tokio::sync::RwLock
This is preliminary work for/from #4220 (async `Layer::get_value_reconstruct_data`). The patch converts `Tenant::timelines` from `std::sync::Mutex` to `tokio::sync::Mutex`. We need this change because we want to switch `Timeline::layers` to an async RwLock. We need that because we hold `Timeline::layers` while calling `Layer::get_value_reconstruct_data`. So, if we want to make get_value_reconstruct_data async, we need to make `Timeline::layers` async first.
This commit is contained in:
@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
@@ -508,7 +508,7 @@ async fn collect_eviction_candidates(
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
for tl in tenant.list_timelines().await {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -312,7 +312,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
let timelines = tenant.list_timelines();
|
||||
let timelines = tenant.list_timelines().await;
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
@@ -351,6 +351,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
@@ -520,7 +521,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
}
|
||||
|
||||
@@ -975,6 +976,7 @@ async fn active_timeline_of_active_tenant(
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)
|
||||
}
|
||||
|
||||
|
||||
@@ -388,7 +388,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
@@ -487,7 +487,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -1201,6 +1203,6 @@ async fn get_active_tenant_timeline(
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, GetActiveTenantError> {
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -1594,13 +1594,15 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
|
||||
.await?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
@@ -1630,7 +1632,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
|
||||
@@ -40,8 +40,7 @@ use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::sync::RwLock;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
@@ -137,7 +136,7 @@ pub struct Tenant {
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines: tokio::sync::Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration
|
||||
@@ -188,7 +187,7 @@ impl UninitializedTimeline<'_> {
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
fn initialize_with_lock(
|
||||
async fn initialize_with_lock(
|
||||
mut self,
|
||||
_ctx: &RequestContext,
|
||||
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
|
||||
@@ -264,8 +263,10 @@ impl UninitializedTimeline<'_> {
|
||||
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
let mut timelines = self.owning_tenant.timelines.lock().await;
|
||||
let tl = self
|
||||
.initialize_with_lock(ctx, &mut timelines, false)
|
||||
.await?;
|
||||
tl.activate(broker_client, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
@@ -485,7 +486,7 @@ impl Tenant {
|
||||
|
||||
let timeline = {
|
||||
// avoiding holding it across awaits
|
||||
let mut timelines_accessor = self.timelines.lock().unwrap();
|
||||
let mut timelines_accessor = self.timelines.lock().await;
|
||||
if timelines_accessor.contains_key(&timeline_id) {
|
||||
anyhow::bail!(
|
||||
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
|
||||
@@ -507,7 +508,10 @@ impl Tenant {
|
||||
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
|
||||
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
|
||||
// will ingest data which may require looking at the layers which are not yet available locally
|
||||
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
|
||||
match timeline
|
||||
.initialize_with_lock(ctx, &mut timelines_accessor, true)
|
||||
.await
|
||||
{
|
||||
Ok(new_timeline) => new_timeline,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
|
||||
@@ -619,7 +623,7 @@ impl Tenant {
|
||||
async move {
|
||||
let doit = async {
|
||||
tenant_clone.attach(&ctx).await?;
|
||||
tenant_clone.activate(broker_client, &ctx)?;
|
||||
tenant_clone.activate(broker_client, &ctx).await?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
@@ -764,7 +768,7 @@ impl Tenant {
|
||||
pub async fn get_remote_size(&self) -> anyhow::Result<u64> {
|
||||
let mut size = 0;
|
||||
|
||||
for timeline in self.list_timelines().iter() {
|
||||
for timeline in self.list_timelines().await.iter() {
|
||||
if let Some(remote_client) = &timeline.remote_client {
|
||||
size += remote_client.get_remote_physical_size();
|
||||
}
|
||||
@@ -790,7 +794,7 @@ impl Tenant {
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
|| {
|
||||
anyhow::anyhow!(
|
||||
@@ -894,7 +898,7 @@ impl Tenant {
|
||||
async move {
|
||||
let doit = async {
|
||||
tenant_clone.load(&ctx).await?;
|
||||
tenant_clone.activate(broker_client, &ctx)?;
|
||||
tenant_clone.activate(broker_client, &ctx).await?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
@@ -1109,7 +1113,7 @@ impl Tenant {
|
||||
};
|
||||
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false).await
|
||||
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
|
||||
Some(ancestor_timeline)
|
||||
} else {
|
||||
@@ -1134,12 +1138,12 @@ impl Tenant {
|
||||
|
||||
/// Get Timeline handle for given Neon timeline ID.
|
||||
/// This function is idempotent. It doesn't change internal state in any way.
|
||||
pub fn get_timeline(
|
||||
pub async fn get_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
active_only: bool,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
|
||||
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
|
||||
})?;
|
||||
@@ -1158,10 +1162,10 @@ impl Tenant {
|
||||
|
||||
/// Lists timelines the tenant contains.
|
||||
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
|
||||
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
|
||||
pub async fn list_timelines(&self) -> Vec<Arc<Timeline>> {
|
||||
self.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.await
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect()
|
||||
@@ -1170,7 +1174,7 @@ impl Tenant {
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
pub fn create_empty_timeline(
|
||||
pub async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1182,7 +1186,7 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
|
||||
drop(timelines);
|
||||
|
||||
@@ -1210,16 +1214,20 @@ impl Tenant {
|
||||
// This makes the various functions which anyhow::ensure! for Active state work in tests.
|
||||
// Our current tests don't need the background loops.
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
let uninit_tl = self
|
||||
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
let tl = uninit_tl
|
||||
.initialize_with_lock(ctx, &mut timelines, true)
|
||||
.await?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -1246,7 +1254,7 @@ impl Tenant {
|
||||
"Cannot create timelines on inactive tenant"
|
||||
);
|
||||
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false).await {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
@@ -1271,6 +1279,7 @@ impl Tenant {
|
||||
Some(ancestor_timeline_id) => {
|
||||
let ancestor_timeline = self
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.await
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
@@ -1365,7 +1374,7 @@ impl Tenant {
|
||||
// compactions. We don't want to block everything else while the
|
||||
// compaction runs.
|
||||
let timelines_to_compact = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let timelines_to_compact = timelines
|
||||
.iter()
|
||||
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
|
||||
@@ -1394,7 +1403,7 @@ impl Tenant {
|
||||
// flushing. We don't want to block everything else while the
|
||||
// flushing is performed.
|
||||
let timelines_to_flush = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
timelines
|
||||
.iter()
|
||||
.map(|(_id, timeline)| Arc::clone(timeline))
|
||||
@@ -1419,7 +1428,7 @@ impl Tenant {
|
||||
// Transition the timeline into TimelineState::Stopping.
|
||||
// This should prevent new operations from starting.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
|
||||
// Ensure that there are no child timelines **attached to that pageserver**,
|
||||
// because detach removes files, which will break child branches
|
||||
@@ -1569,7 +1578,7 @@ impl Tenant {
|
||||
});
|
||||
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
|
||||
@@ -1613,7 +1622,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
fn activate(
|
||||
async fn activate(
|
||||
&self,
|
||||
broker_client: BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
@@ -1658,7 +1667,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
if activating {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
@@ -1738,7 +1747,7 @@ impl Tenant {
|
||||
});
|
||||
|
||||
if stopping {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
@@ -1947,12 +1956,12 @@ impl Tenant {
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
pub async fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
let timelines = self.list_timelines().await;
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
}
|
||||
@@ -2030,7 +2039,7 @@ impl Tenant {
|
||||
// activation times.
|
||||
loading_started_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines: tokio::sync::Mutex::new(HashMap::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
@@ -2259,7 +2268,7 @@ impl Tenant {
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let mut all_branchpoints = BTreeSet::new();
|
||||
let timeline_ids = {
|
||||
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
|
||||
@@ -2306,6 +2315,7 @@ impl Tenant {
|
||||
// Timeline is known to be local and loaded.
|
||||
let timeline = self
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.with_context(|| format!("Timeline {timeline_id} was not found"))?;
|
||||
|
||||
// If target_timeline is specified, ignore all other timelines
|
||||
@@ -2391,7 +2401,7 @@ impl Tenant {
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
self.create_timeline_uninit_mark(dst_id, &timelines)?
|
||||
};
|
||||
|
||||
@@ -2458,7 +2468,7 @@ impl Tenant {
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
self.prepare_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
@@ -2466,7 +2476,8 @@ impl Tenant {
|
||||
false,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)
|
||||
.await?
|
||||
};
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
@@ -2496,7 +2507,7 @@ impl Tenant {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
self.create_timeline_uninit_mark(timeline_id, &timelines)?
|
||||
};
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
@@ -2582,8 +2593,10 @@ impl Tenant {
|
||||
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
raw_timeline
|
||||
.initialize_with_lock(ctx, &mut timelines, false)
|
||||
.await?
|
||||
};
|
||||
|
||||
info!(
|
||||
@@ -2685,7 +2698,7 @@ impl Tenant {
|
||||
fn create_timeline_uninit_mark(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines: &tokio::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) -> anyhow::Result<TimelineUninitMark> {
|
||||
let tenant_id = self.tenant_id;
|
||||
|
||||
@@ -3229,7 +3242,7 @@ pub mod harness {
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
for timeline in tenant.timelines.lock().await.values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
Ok(tenant)
|
||||
@@ -3289,7 +3302,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3322,9 +3337,14 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3353,7 +3373,9 @@ mod tests {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let writer = tline.writer();
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -3379,6 +3401,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer();
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
|
||||
@@ -3450,7 +3473,9 @@ mod tests {
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3487,8 +3512,9 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
|
||||
match tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
@@ -3517,7 +3543,7 @@ mod tests {
|
||||
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
|
||||
.load();
|
||||
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION).await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
|
||||
@@ -3537,7 +3563,9 @@ mod tests {
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3545,6 +3573,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3585,7 +3614,9 @@ mod tests {
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3593,6 +3624,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
tenant
|
||||
@@ -3608,7 +3640,9 @@ mod tests {
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3616,6 +3650,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3640,14 +3675,16 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
let (tenant, _ctx) = harness.load().await;
|
||||
tenant
|
||||
.get_timeline(TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot load timeline");
|
||||
|
||||
Ok(())
|
||||
@@ -3660,8 +3697,9 @@ mod tests {
|
||||
// create two timelines
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -3672,6 +3710,7 @@ mod tests {
|
||||
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3683,10 +3722,12 @@ mod tests {
|
||||
// check that both, child and ancestor are loaded
|
||||
let _child_tline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot get child timeline loaded");
|
||||
|
||||
let _ancestor_tline = tenant
|
||||
.get_timeline(TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot get ancestor timeline loaded");
|
||||
|
||||
Ok(())
|
||||
@@ -3698,7 +3739,9 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -3736,7 +3779,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3801,7 +3846,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -3843,7 +3890,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -3916,8 +3965,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -3954,6 +4004,7 @@ mod tests {
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
.await
|
||||
.expect("Should have the branched timeline");
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
@@ -3999,8 +4050,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4019,6 +4071,7 @@ mod tests {
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
.await
|
||||
.expect("Should have the branched timeline");
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
|
||||
@@ -319,7 +319,7 @@ pub async fn set_new_tenant_config(
|
||||
new_tenant_conf,
|
||||
false,
|
||||
)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
tenant.set_new_tenant_config(new_tenant_conf).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -686,6 +686,7 @@ pub async fn immediate_compact(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
|
||||
@@ -1264,7 +1264,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(0),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
let mut timelines = tenant.list_timelines().await;
|
||||
|
||||
if timelines.is_empty() {
|
||||
// perhaps the tenant has just been created, and as such doesn't have any data yet
|
||||
|
||||
@@ -1309,6 +1309,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1428,7 +1428,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1497,7 +1497,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1637,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
|
||||
Reference in New Issue
Block a user