mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
31 Commits
remove_ini
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a66a16ddf9 | ||
|
|
2001c31a14 | ||
|
|
7de3799e66 | ||
|
|
a1680b185f | ||
|
|
a1ae23b827 | ||
|
|
1ebe92bcf9 | ||
|
|
413598b19b | ||
|
|
b345f32e3f | ||
|
|
69cfa9fe61 | ||
|
|
2c424c8f4e | ||
|
|
4001f441c0 | ||
|
|
ef956c47fc | ||
|
|
8606b6abe5 | ||
|
|
732f60317b | ||
|
|
b54431bbd3 | ||
|
|
def5eb8542 | ||
|
|
07da786ed3 | ||
|
|
75c3c43b2e | ||
|
|
bdf03eab58 | ||
|
|
32c85fa87a | ||
|
|
b2e0c58a8c | ||
|
|
94f30f0660 | ||
|
|
a55d224923 | ||
|
|
4f586ac101 | ||
|
|
feb2e80b83 | ||
|
|
ee22e81583 | ||
|
|
3e604eaa39 | ||
|
|
8bcb542a3b | ||
|
|
17b081d294 | ||
|
|
d5337e6a65 | ||
|
|
cc96a5186d |
@@ -150,7 +150,7 @@ pub async fn collect_metrics_iteration(
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// iterate through list of timelines in tenant
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
// collect per-timeline metrics only for active timelines
|
||||
if timeline.is_active() {
|
||||
let timeline_written_size = u64::from(timeline.get_last_record_lsn());
|
||||
|
||||
@@ -512,11 +512,11 @@ async fn collect_eviction_candidates(
|
||||
// a little unfair to tenants during shutdown in such a situation is tolerable.
|
||||
let mut tenant_candidates = Vec::new();
|
||||
let mut max_layer_size = 0;
|
||||
for tl in tenant.list_timelines() {
|
||||
for tl in tenant.list_timelines().await {
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -214,7 +214,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -232,7 +232,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -263,7 +263,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -329,6 +329,7 @@ async fn timeline_create_handler(
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -353,7 +354,7 @@ async fn timeline_list_handler(
|
||||
|
||||
let response_data = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
let timelines = tenant.list_timelines();
|
||||
let timelines = tenant.list_timelines().await;
|
||||
|
||||
let mut response_data = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
@@ -395,6 +396,7 @@ async fn timeline_detail_handler(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
@@ -588,8 +590,8 @@ async fn tenant_status(
|
||||
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -699,7 +701,7 @@ async fn layer_map_info_handler(
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
@@ -1069,6 +1071,7 @@ async fn active_timeline_of_active_tenant(
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)
|
||||
}
|
||||
|
||||
|
||||
@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
}
|
||||
modification.flush()?;
|
||||
modification.flush().await?;
|
||||
}
|
||||
tokio_tar::EntryType::Directory => {
|
||||
debug!("directory {:?}", file_path);
|
||||
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -594,7 +594,7 @@ async fn import_file(
|
||||
// zenith.signal is not necessarily the last file, that we handle
|
||||
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
||||
// will update lsn once more to the final one.
|
||||
let writer = modification.tline.writer();
|
||||
let writer = modification.tline.writer().await;
|
||||
writer.finish_write(prev_lsn);
|
||||
|
||||
debug!("imported zenith signal {}", prev_lsn);
|
||||
|
||||
@@ -390,7 +390,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true).await?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
@@ -489,7 +489,9 @@ impl PageServerHandler {
|
||||
// Create empty timeline
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
|
||||
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
|
||||
.await?;
|
||||
|
||||
// TODO mark timeline as not ready until it reaches end_lsn.
|
||||
// We might have some wal to import as well, and we should prevent compute
|
||||
@@ -1230,6 +1232,7 @@ async fn get_active_tenant_timeline(
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(GetActiveTimelineError::Timeline)?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
|
||||
/// retains all the metadata, but data pages are flushed. That's again OK
|
||||
/// for bulk import, where you are just loading data pages and won't try to
|
||||
/// modify the same pages twice.
|
||||
pub fn flush(&mut self) -> anyhow::Result<()> {
|
||||
pub async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
// Unless we have accumulated a decent amount of changes, it's not worth it
|
||||
// to scan through the pending_updates list.
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
@@ -1116,13 +1116,15 @@ impl<'a> DatadirModification<'a> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let writer = self.tline.writer();
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
let mut layer_map = self.tline.layers.write().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
self.pending_updates.retain(|&key, value| {
|
||||
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
|
||||
result = writer.put(key, self.lsn, value);
|
||||
result = writer.put_locked(key, self.lsn, value, &mut layer_map);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
@@ -1143,17 +1145,17 @@ impl<'a> DatadirModification<'a> {
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer();
|
||||
pub async fn commit(&mut self) -> anyhow::Result<()> {
|
||||
let writer = self.tline.writer().await;
|
||||
let lsn = self.lsn;
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
@@ -1594,16 +1596,18 @@ fn is_slru_block_key(key: Key) -> bool {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
tenant: &crate::tenant::Tenant,
|
||||
timeline_id: utils::id::TimelineId,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)
|
||||
.await?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
@@ -1630,7 +1634,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
|
||||
let tline = create_empty_timeline(repo, TIMELINE_ID).await?;
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
|
||||
@@ -41,8 +41,7 @@ use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::sync::RwLock;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
@@ -139,7 +138,7 @@ pub struct Tenant {
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines: tokio::sync::Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration
|
||||
@@ -190,7 +189,7 @@ impl UninitializedTimeline<'_> {
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
fn initialize_with_lock(
|
||||
async fn initialize_with_lock(
|
||||
mut self,
|
||||
_ctx: &RequestContext,
|
||||
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
|
||||
@@ -216,6 +215,7 @@ impl UninitializedTimeline<'_> {
|
||||
if load_layer_map {
|
||||
new_timeline
|
||||
.load_layer_map(new_disk_consistent_lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to load layermap for timeline {tenant_id}/{timeline_id}"
|
||||
@@ -266,8 +266,10 @@ impl UninitializedTimeline<'_> {
|
||||
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
let mut timelines = self.owning_tenant.timelines.lock().await;
|
||||
let tl = self
|
||||
.initialize_with_lock(ctx, &mut timelines, false)
|
||||
.await?;
|
||||
tl.activate(broker_client, None, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
@@ -525,7 +527,7 @@ impl Tenant {
|
||||
|
||||
let timeline = {
|
||||
// avoiding holding it across awaits
|
||||
let mut timelines_accessor = self.timelines.lock().unwrap();
|
||||
let mut timelines_accessor = self.timelines.lock().await;
|
||||
if timelines_accessor.contains_key(&timeline_id) {
|
||||
anyhow::bail!(
|
||||
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
|
||||
@@ -548,7 +550,10 @@ impl Tenant {
|
||||
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
|
||||
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
|
||||
// will ingest data which may require looking at the layers which are not yet available locally
|
||||
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
|
||||
match timeline
|
||||
.initialize_with_lock(ctx, &mut timelines_accessor, true)
|
||||
.await
|
||||
{
|
||||
Ok(new_timeline) => new_timeline,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
|
||||
@@ -594,7 +599,7 @@ impl Tenant {
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
@@ -662,7 +667,7 @@ impl Tenant {
|
||||
match tenant_clone.attach(&ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
tenant_clone.activate(broker_client, None, &ctx).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("attach failed, setting tenant state to Broken: {:?}", e);
|
||||
@@ -809,7 +814,7 @@ impl Tenant {
|
||||
pub async fn get_remote_size(&self) -> anyhow::Result<u64> {
|
||||
let mut size = 0;
|
||||
|
||||
for timeline in self.list_timelines().iter() {
|
||||
for timeline in self.list_timelines().await.iter() {
|
||||
if let Some(remote_client) = &timeline.remote_client {
|
||||
size += remote_client.get_remote_physical_size();
|
||||
}
|
||||
@@ -835,7 +840,7 @@ impl Tenant {
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
|| {
|
||||
anyhow::anyhow!(
|
||||
@@ -949,7 +954,7 @@ impl Tenant {
|
||||
Ok(()) => {
|
||||
debug!("load finished, activating");
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx).await;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
@@ -1174,7 +1179,7 @@ impl Tenant {
|
||||
};
|
||||
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false).await
|
||||
.with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))?;
|
||||
Some(ancestor_timeline)
|
||||
} else {
|
||||
@@ -1200,12 +1205,12 @@ impl Tenant {
|
||||
|
||||
/// Get Timeline handle for given Neon timeline ID.
|
||||
/// This function is idempotent. It doesn't change internal state in any way.
|
||||
pub fn get_timeline(
|
||||
pub async fn get_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
active_only: bool,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
|
||||
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
|
||||
})?;
|
||||
@@ -1224,10 +1229,10 @@ impl Tenant {
|
||||
|
||||
/// Lists timelines the tenant contains.
|
||||
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
|
||||
pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
|
||||
pub async fn list_timelines(&self) -> Vec<Arc<Timeline>> {
|
||||
self.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.await
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect()
|
||||
@@ -1236,7 +1241,7 @@ impl Tenant {
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
pub fn create_empty_timeline(
|
||||
pub async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1248,7 +1253,7 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?;
|
||||
drop(timelines);
|
||||
|
||||
@@ -1268,6 +1273,7 @@ impl Tenant {
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create an emtpy timeline.
|
||||
@@ -1276,16 +1282,20 @@ impl Tenant {
|
||||
// This makes the various functions which anyhow::ensure! for Active state work in tests.
|
||||
// Our current tests don't need the background loops.
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
pub async fn create_test_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
let uninit_tl = self
|
||||
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
let tl = uninit_tl
|
||||
.initialize_with_lock(ctx, &mut timelines, true)
|
||||
.await?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -1312,7 +1322,7 @@ impl Tenant {
|
||||
"Cannot create timelines on inactive tenant"
|
||||
);
|
||||
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false).await {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
@@ -1337,6 +1347,7 @@ impl Tenant {
|
||||
Some(ancestor_timeline_id) => {
|
||||
let ancestor_timeline = self
|
||||
.get_timeline(ancestor_timeline_id, false)
|
||||
.await
|
||||
.context("Cannot branch off the timeline that's not present in pageserver")?;
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
@@ -1432,7 +1443,7 @@ impl Tenant {
|
||||
// compactions. We don't want to block everything else while the
|
||||
// compaction runs.
|
||||
let timelines_to_compact = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let timelines_to_compact = timelines
|
||||
.iter()
|
||||
.map(|(timeline_id, timeline)| (*timeline_id, timeline.clone()))
|
||||
@@ -1491,7 +1502,7 @@ impl Tenant {
|
||||
};
|
||||
|
||||
{
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
timelines
|
||||
.iter()
|
||||
.map(|(id, tl)| (*id, Arc::clone(tl)))
|
||||
@@ -1529,7 +1540,7 @@ impl Tenant {
|
||||
let timeline;
|
||||
let mut delete_lock_guard;
|
||||
{
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
|
||||
// Ensure that there are no child timelines **attached to that pageserver**,
|
||||
// because detach removes files, which will break child branches
|
||||
@@ -1698,7 +1709,7 @@ impl Tenant {
|
||||
|
||||
// Remove the timeline from the map.
|
||||
{
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
@@ -1734,7 +1745,7 @@ impl Tenant {
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
/// to delay background jobs. Background jobs can be started right away when None is given.
|
||||
fn activate(
|
||||
async fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
@@ -1763,7 +1774,7 @@ impl Tenant {
|
||||
});
|
||||
|
||||
if activating {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
@@ -1922,7 +1933,7 @@ impl Tenant {
|
||||
),
|
||||
}
|
||||
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timelines_accessor = self.timelines.lock().await;
|
||||
let not_broken_timelines = timelines_accessor
|
||||
.values()
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
@@ -2149,12 +2160,12 @@ impl Tenant {
|
||||
.or(self.conf.default_tenant_conf.min_resident_size_override)
|
||||
}
|
||||
|
||||
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
pub async fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
|
||||
*self.tenant_conf.write().unwrap() = new_tenant_conf;
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
let timelines = self.list_timelines().await;
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated();
|
||||
}
|
||||
@@ -2238,7 +2249,7 @@ impl Tenant {
|
||||
// activation times.
|
||||
loading_started_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines: tokio::sync::Mutex::new(HashMap::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
@@ -2467,7 +2478,7 @@ impl Tenant {
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
let mut all_branchpoints = BTreeSet::new();
|
||||
let timeline_ids = {
|
||||
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
|
||||
@@ -2514,6 +2525,7 @@ impl Tenant {
|
||||
// Timeline is known to be local and loaded.
|
||||
let timeline = self
|
||||
.get_timeline(timeline_id, false)
|
||||
.await
|
||||
.with_context(|| format!("Timeline {timeline_id} was not found"))?;
|
||||
|
||||
// If target_timeline is specified, ignore all other timelines
|
||||
@@ -2599,7 +2611,7 @@ impl Tenant {
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
self.create_timeline_uninit_mark(dst_id, &timelines)?
|
||||
};
|
||||
|
||||
@@ -2666,15 +2678,17 @@ impl Tenant {
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
self.prepare_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_uninit_mark,
|
||||
false,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)?
|
||||
)
|
||||
.await?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)
|
||||
.await?
|
||||
};
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
@@ -2704,7 +2718,7 @@ impl Tenant {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines = self.timelines.lock().await;
|
||||
self.create_timeline_uninit_mark(timeline_id, &timelines)?
|
||||
};
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
@@ -2751,8 +2765,9 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let raw_timeline =
|
||||
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
|
||||
let raw_timeline = self
|
||||
.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)
|
||||
.await?;
|
||||
|
||||
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||
@@ -2790,8 +2805,10 @@ impl Tenant {
|
||||
// Initialize the timeline without loading the layer map, because we already updated the layer
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
|
||||
let mut timelines = self.timelines.lock().await;
|
||||
raw_timeline
|
||||
.initialize_with_lock(ctx, &mut timelines, false)
|
||||
.await?
|
||||
};
|
||||
|
||||
info!(
|
||||
@@ -2805,7 +2822,7 @@ impl Tenant {
|
||||
|
||||
/// Creates intermediate timeline structure and its files, without loading it into memory.
|
||||
/// It's up to the caller to import the necesary data and import the timeline into memory.
|
||||
fn prepare_timeline(
|
||||
async fn prepare_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
@@ -2837,7 +2854,7 @@ impl Tenant {
|
||||
) {
|
||||
Ok(new_timeline) => {
|
||||
if init_layers {
|
||||
new_timeline.layers.write().unwrap().next_open_layer_at =
|
||||
new_timeline.layers.write().await.next_open_layer_at =
|
||||
Some(new_timeline.initdb_lsn);
|
||||
}
|
||||
debug!(
|
||||
@@ -2893,7 +2910,7 @@ impl Tenant {
|
||||
fn create_timeline_uninit_mark(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
timelines: &tokio::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) -> anyhow::Result<TimelineUninitMark> {
|
||||
let tenant_id = self.tenant_id;
|
||||
|
||||
@@ -3438,7 +3455,7 @@ pub mod harness {
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
for timeline in tenant.timelines.lock().await.values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
Ok(tenant)
|
||||
@@ -3498,15 +3515,21 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -3531,9 +3554,14 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
match tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
@@ -3562,8 +3590,10 @@ mod tests {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let writer = tline.writer().await;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
|
||||
@@ -3571,13 +3601,21 @@ mod tests {
|
||||
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
|
||||
|
||||
// Insert a value on the timeline
|
||||
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
|
||||
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
|
||||
.await?;
|
||||
writer
|
||||
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
|
||||
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
|
||||
//assert_current_logical_size(&tline, Lsn(0x40));
|
||||
@@ -3588,9 +3626,12 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer();
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
|
||||
let new_writer = newtline.writer().await;
|
||||
new_writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
|
||||
.await?;
|
||||
new_writer.finish_write(Lsn(0x40));
|
||||
|
||||
// Check page contents on both branches
|
||||
@@ -3616,38 +3657,46 @@ mod tests {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
let writer = tline.writer();
|
||||
let writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
}
|
||||
tline.freeze_and_flush().await?;
|
||||
{
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
}
|
||||
tline.freeze_and_flush().await
|
||||
@@ -3659,7 +3708,9 @@ mod tests {
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3696,8 +3747,9 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
|
||||
match tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
@@ -3726,7 +3778,7 @@ mod tests {
|
||||
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
|
||||
.load();
|
||||
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION).await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
|
||||
@@ -3746,7 +3798,9 @@ mod tests {
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3754,6 +3808,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3794,7 +3849,9 @@ mod tests {
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3802,6 +3859,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
tenant
|
||||
@@ -3817,7 +3875,9 @@ mod tests {
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
@@ -3825,6 +3885,7 @@ mod tests {
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3849,14 +3910,16 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
let (tenant, _ctx) = harness.load().await;
|
||||
tenant
|
||||
.get_timeline(TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot load timeline");
|
||||
|
||||
Ok(())
|
||||
@@ -3869,8 +3932,9 @@ mod tests {
|
||||
// create two timelines
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
@@ -3881,6 +3945,7 @@ mod tests {
|
||||
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
|
||||
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
|
||||
@@ -3892,10 +3957,12 @@ mod tests {
|
||||
// check that both, child and ancestor are loaded
|
||||
let _child_tline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot get child timeline loaded");
|
||||
|
||||
let _ancestor_tline = tenant
|
||||
.get_timeline(TIMELINE_ID, true)
|
||||
.await
|
||||
.expect("cannot get ancestor timeline loaded");
|
||||
|
||||
Ok(())
|
||||
@@ -3907,7 +3974,9 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
@@ -3945,34 +4014,44 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
drop(writer);
|
||||
|
||||
@@ -4010,7 +4089,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -4021,12 +4102,14 @@ mod tests {
|
||||
for _ in 0..50 {
|
||||
for _ in 0..10000 {
|
||||
test_key.field6 = blknum;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -4052,7 +4135,9 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4069,12 +4154,14 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -4087,12 +4174,14 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
updated[blknum] = lsn;
|
||||
@@ -4125,8 +4214,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -4143,12 +4233,14 @@ mod tests {
|
||||
for blknum in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -4163,18 +4255,21 @@ mod tests {
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
.await
|
||||
.expect("Should have the branched timeline");
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating {} at {}", blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -4208,8 +4303,9 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let mut tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -4228,18 +4324,21 @@ mod tests {
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
.await
|
||||
.expect("Should have the branched timeline");
|
||||
|
||||
for _ in 0..NUM_KEYS {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer();
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)?;
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating [{}][{}] at {}", idx, blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -352,7 +352,7 @@ pub async fn set_new_tenant_config(
|
||||
false,
|
||||
)
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
tenant.set_new_tenant_config(new_tenant_conf);
|
||||
tenant.set_new_tenant_config(new_tenant_conf).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -726,6 +726,7 @@ pub async fn immediate_compact(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.await
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
|
||||
@@ -1264,7 +1264,12 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(0),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -136,7 +136,7 @@ pub(super) async fn gather_inputs(
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
let mut timelines = tenant.list_timelines().await;
|
||||
|
||||
if timelines.is_empty() {
|
||||
// perhaps the tenant has just been created, and as such doesn't have any data yet
|
||||
|
||||
@@ -304,7 +304,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -120,7 +120,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -180,7 +180,7 @@ pub struct Timeline {
|
||||
/// Locked automatically by [`TimelineWriter`] and checkpointer.
|
||||
/// Must always be acquired before the layer map/individual layer lock
|
||||
/// to avoid deadlock.
|
||||
write_lock: Mutex<()>,
|
||||
write_lock: tokio::sync::Mutex<()>,
|
||||
|
||||
/// Used to avoid multiple `flush_loop` tasks running
|
||||
flush_loop_state: Mutex<FlushLoopState>,
|
||||
@@ -251,6 +251,8 @@ pub struct Timeline {
|
||||
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
|
||||
}
|
||||
|
||||
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
/// Calculation consists of two stages:
|
||||
@@ -592,8 +594,8 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -684,7 +686,7 @@ impl Timeline {
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false);
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
|
||||
@@ -864,10 +866,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub fn writer(&self) -> TimelineWriter<'_> {
|
||||
pub async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().unwrap(),
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -901,9 +903,9 @@ impl Timeline {
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
@@ -925,7 +927,7 @@ impl Timeline {
|
||||
last_freeze_ts.elapsed()
|
||||
);
|
||||
|
||||
self.freeze_inmem_layer(true);
|
||||
self.freeze_inmem_layer(true).await;
|
||||
self.last_freeze_at.store(last_lsn);
|
||||
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
|
||||
|
||||
@@ -1012,8 +1014,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -1035,7 +1037,7 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
@@ -1048,7 +1050,7 @@ impl Timeline {
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1133,7 +1135,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
@@ -1386,7 +1388,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -1421,7 +1423,7 @@ impl Timeline {
|
||||
layer_flush_start_tx,
|
||||
layer_flush_done_tx,
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
write_lock: tokio::sync::Mutex::new(()),
|
||||
layer_removal_cs: Default::default(),
|
||||
|
||||
gc_info: std::sync::RwLock::new(GcInfo {
|
||||
@@ -1563,8 +1565,8 @@ impl Timeline {
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -1693,7 +1695,7 @@ impl Timeline {
|
||||
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
@@ -1846,7 +1848,7 @@ impl Timeline {
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@@ -2219,8 +2221,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().await.iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(historic_layer);
|
||||
@@ -2430,7 +2432,7 @@ impl Timeline {
|
||||
#[allow(clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
let layers = timeline.layers.read().await;
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2612,9 +2614,16 @@ impl Timeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().await;
|
||||
self.get_layer_for_write_locked(lsn, &mut layers)
|
||||
}
|
||||
|
||||
fn get_layer_for_write_locked(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
layers: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -2657,16 +2666,29 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
fn put_value_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write_locked(lsn, pre_locked_layer_map)?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_tombstone(key_range, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2678,15 +2700,15 @@ impl Timeline {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
|
||||
// Freeze the current open in-memory layer. It will be written to disk on next
|
||||
// iteration.
|
||||
let _write_guard = if write_lock_held {
|
||||
None
|
||||
} else {
|
||||
Some(self.write_lock.lock().unwrap())
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -2724,7 +2746,7 @@ impl Timeline {
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
@@ -2816,16 +2838,7 @@ impl Timeline {
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let this = self.clone();
|
||||
let frozen_layer = frozen_layer.clone();
|
||||
let span = tracing::info_span!("blocking");
|
||||
let (delta_path, metadata) = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
this.create_delta_layer(&frozen_layer)
|
||||
})
|
||||
.await
|
||||
.context("create_delta_layer spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
|
||||
HashMap::from([(delta_path, metadata)])
|
||||
};
|
||||
|
||||
@@ -2834,7 +2847,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2928,34 +2941,46 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(
|
||||
async fn create_delta_layer(
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
let new_delta_filename = new_delta.filename();
|
||||
// TODO figure out how to use spawn_blocking. Can't use it because frozen_layer is not 'static
|
||||
let (new_delta, sz): (DeltaLayer, _) = tokio::task::block_in_place({
|
||||
let self_clone = Arc::clone(self);
|
||||
move || {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
// Sync it to disk.
|
||||
//
|
||||
// We must also fsync the timeline dir to ensure the directory entries for
|
||||
// new layer files are durable
|
||||
//
|
||||
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
|
||||
// files to flush, it might be better to first write them all, and then fsync
|
||||
// them all in parallel.
|
||||
|
||||
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
|
||||
// this with a single fsync in future refactors.
|
||||
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
|
||||
// Then sync the parent directory.
|
||||
par_fsync::par_fsync(&[self.conf.timeline_path(&self.timeline_id, &self.tenant_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
// First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace
|
||||
// this with a single fsync in future refactors.
|
||||
par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?;
|
||||
// Then sync the parent directory.
|
||||
par_fsync::par_fsync(&[self_clone
|
||||
.conf
|
||||
.timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)])
|
||||
.context("fsync of timeline dir")?;
|
||||
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
|
||||
anyhow::Ok((new_delta, sz))
|
||||
}
|
||||
})?;
|
||||
let new_delta_name = new_delta.filename();
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
@@ -2966,14 +2991,12 @@ impl Timeline {
|
||||
batch_updates.flush();
|
||||
|
||||
// update the timeline's physical size
|
||||
let sz = new_delta_path.metadata()?.len();
|
||||
|
||||
self.metrics.resident_physical_size_gauge.add(sz);
|
||||
// update metrics
|
||||
self.metrics.num_persistent_files_created.inc_by(1);
|
||||
self.metrics.persistent_bytes_written.inc_by(sz);
|
||||
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
Ok((new_delta_name, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
async fn repartition(
|
||||
@@ -3007,10 +3030,14 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
@@ -3105,7 +3132,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn)? {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3188,7 +3215,7 @@ impl Timeline {
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
|
||||
@@ -3253,12 +3280,11 @@ impl Timeline {
|
||||
fn compact_level0_phase1(
|
||||
&self,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
@@ -3379,7 +3405,6 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
@@ -3596,13 +3621,14 @@ impl Timeline {
|
||||
let this = self.clone();
|
||||
let ctx_inner = ctx.clone();
|
||||
let layer_removal_cs_inner = layer_removal_cs.clone();
|
||||
let layers = Arc::clone(&self.layers).read_owned().await;
|
||||
let span = tracing::info_span!("blocking");
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
this.compact_level0_phase1(layer_removal_cs_inner, target_file_size, &ctx_inner)
|
||||
this.compact_level0_phase1(layer_removal_cs_inner, layers, target_file_size, &ctx_inner)
|
||||
})
|
||||
.await
|
||||
.context("compact_level0_phase1 spawn_blocking")
|
||||
@@ -3625,7 +3651,7 @@ impl Timeline {
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
@@ -3886,7 +3912,7 @@ impl Timeline {
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -4186,7 +4212,7 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut layers = self_clone.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
@@ -4344,7 +4370,7 @@ impl Timeline {
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
@@ -4446,8 +4472,8 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
@@ -4519,7 +4545,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
// but will cause large code changes.
|
||||
pub struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: MutexGuard<'a, ()>,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
|
||||
impl Deref for TimelineWriter<'_> {
|
||||
@@ -4535,12 +4561,23 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value).await
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
pub fn put_locked(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
pre_locked_layer_map: &mut LayerMapWriteLockGuard,
|
||||
) -> anyhow::Result<()> {
|
||||
self.tl
|
||||
.put_value_locked(key, lsn, value, pre_locked_layer_map)
|
||||
}
|
||||
|
||||
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn).await
|
||||
}
|
||||
|
||||
/// Track the end of the latest digested WAL record.
|
||||
|
||||
@@ -197,7 +197,7 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
|
||||
@@ -1325,6 +1325,7 @@ mod tests {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.await
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
|
||||
ConnectionManagerState {
|
||||
|
||||
@@ -304,12 +304,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
}
|
||||
|
||||
timeline.check_checkpoint_distance().with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
let timeline_remote_consistent_lsn =
|
||||
|
||||
@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit()?;
|
||||
modification.commit().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1200,7 +1200,7 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1209,7 +1209,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_relsize() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
@@ -1217,22 +1217,22 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1318,7 +1318,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1360,7 +1360,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
|
||||
@@ -1373,7 +1373,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
|
||||
@@ -1398,7 +1398,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
|
||||
@@ -1428,14 +1428,14 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_drop_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1454,7 +1454,7 @@ mod tests {
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
@@ -1472,7 +1472,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
@@ -1497,7 +1497,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_truncate_extend() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
@@ -1509,7 +1509,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
@@ -1554,7 +1554,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
@@ -1603,7 +1603,7 @@ mod tests {
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
|
||||
.await?;
|
||||
}
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
@@ -1637,7 +1637,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_large_rel() -> Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?;
|
||||
let mut walingest = init_walingest_test(&tline, &ctx).await?;
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -1648,7 +1648,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1664,7 +1664,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE
|
||||
@@ -1677,7 +1677,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
RELSEG_SIZE - 1
|
||||
@@ -1693,7 +1693,7 @@ mod tests {
|
||||
walingest
|
||||
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
|
||||
.await?;
|
||||
m.commit()?;
|
||||
m.commit().await?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
|
||||
size as BlockNumber
|
||||
|
||||
Reference in New Issue
Block a user