diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 7087c68dbd..14ea054577 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -825,14 +825,14 @@ async fn timeline_gc_handler(mut request: Request
) -> Result) -> Result {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
- raw_timeline: Option<(Timeline, TimelineUninitMark)>,
+ raw_timeline: Option<(Arc, TimelineUninitMark)>,
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
@@ -169,7 +174,6 @@ impl UninitializedTimeline<'_> {
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
- let new_timeline = Arc::new(new_timeline);
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
// TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least
@@ -197,6 +201,9 @@ impl UninitializedTimeline<'_> {
})?;
new_timeline.set_state(TimelineState::Active);
v.insert(Arc::clone(&new_timeline));
+
+ new_timeline.maybe_spawn_flush_loop();
+
new_timeline.launch_wal_receiver();
}
}
@@ -205,20 +212,28 @@ impl UninitializedTimeline<'_> {
}
/// Prepares timeline data by loading it from the basebackup archive.
- pub fn import_basebackup_from_tar(
- &self,
- reader: impl std::io::Read,
+ pub async fn import_basebackup_from_tar(
+ self,
+ mut copyin_stream: &mut Pin<&mut impl Stream- >>,
base_lsn: Lsn,
- ) -> anyhow::Result<()> {
+ ) -> anyhow::Result> {
let raw_timeline = self.raw_timeline()?;
- import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn).with_context(
- || {
- format!(
- "Failed to import basebackup for timeline {}/{}",
- self.owning_tenant.tenant_id, self.timeline_id
- )
- },
- )?;
+
+ // import_basebackup_from_tar() is not async, mainly because the Tar crate
+ // it uses is not async. So we need to jump through some hoops:
+ // - convert the input from client connection to a synchronous Read
+ // - use block_in_place()
+ let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
+
+ tokio::task::block_in_place(|| {
+ import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn)
+ .context("Failed to import basebackup")
+ })?;
+
+ // Flush loop needs to be spawned in order for checkpoint to be able to flush.
+ // We want to run proper checkpoint before we mark timeline as available to outside world
+ // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
+ raw_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
@@ -226,16 +241,15 @@ impl UninitializedTimeline<'_> {
raw_timeline
.checkpoint(CheckpointConfig::Flush)
- .with_context(|| {
- format!(
- "Failed to checkpoint after basebackup import for timeline {}/{}",
- self.owning_tenant.tenant_id, self.timeline_id
- )
- })?;
- Ok(())
+ .await
+ .context("Failed to checkpoint after basebackup import")?;
+
+ let timeline = self.initialize()?;
+
+ Ok(timeline)
}
- fn raw_timeline(&self) -> anyhow::Result<&Timeline> {
+ fn raw_timeline(&self) -> anyhow::Result<&Arc> {
Ok(&self
.raw_timeline
.as_ref()
@@ -470,7 +484,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
- None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
+ None => self.bootstrap_timeline(new_timeline_id, pg_version).await?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
@@ -488,7 +502,7 @@ impl Tenant {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
- pub fn gc_iteration(
+ pub async fn gc_iteration(
&self,
target_timeline_id: Option,
horizon: u64,
@@ -504,11 +518,13 @@ impl Tenant {
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
- STORAGE_TIME
- .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
- .observe_closure_duration(|| {
- self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
- })
+ {
+ let _timer = STORAGE_TIME
+ .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
+ .start_timer();
+ self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
+ .await
+ }
}
/// Perform one compaction iteration.
@@ -544,23 +560,24 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
- pub fn checkpoint(&self) -> anyhow::Result<()> {
+ pub async fn checkpoint(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
- let timelines = self.timelines.lock().unwrap();
- let timelines_to_checkpoint = timelines
- .iter()
- .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline)))
- .collect::>();
- drop(timelines);
+ let timelines_to_checkpoint = {
+ let timelines = self.timelines.lock().unwrap();
+ timelines
+ .iter()
+ .map(|(id, timeline)| (*id, Arc::clone(timeline)))
+ .collect::>()
+ };
- for (timeline_id, timeline) in &timelines_to_checkpoint {
- let _entered =
- info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id)
- .entered();
- timeline.checkpoint(CheckpointConfig::Flush)?;
+ for (id, timeline) in &timelines_to_checkpoint {
+ timeline
+ .checkpoint(CheckpointConfig::Flush)
+ .instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id))
+ .await?;
}
Ok(())
@@ -974,7 +991,7 @@ impl Tenant {
// - if a relation has a non-incremental persistent layer on a child branch, then we
// don't need to keep that in the parent anymore. But currently
// we do.
- fn gc_iteration_internal(
+ async fn gc_iteration_internal(
&self,
target_timeline_id: Option,
horizon: u64,
@@ -1007,7 +1024,7 @@ impl Tenant {
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
- timeline.checkpoint(CheckpointConfig::Forced)?;
+ timeline.checkpoint(CheckpointConfig::Forced).await?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
@@ -1117,7 +1134,6 @@ impl Tenant {
}
}
drop(gc_cs);
-
Ok(gc_timelines)
}
@@ -1222,14 +1238,15 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
- fn bootstrap_timeline(
+ async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> anyhow::Result> {
- let timelines = self.timelines.lock().unwrap();
- let timeline_uninit_mark = self.create_timeline_uninit_mark(timeline_id, &timelines)?;
- drop(timelines);
+ let timeline_uninit_mark = {
+ let timelines = self.timelines.lock().unwrap();
+ self.create_timeline_uninit_mark(timeline_id, &timelines)?
+ };
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
@@ -1279,25 +1296,35 @@ impl Tenant {
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
- import_datadir::import_timeline_from_postgres_datadir(
- unfinished_timeline,
- pgdata_path,
- pgdata_lsn,
- )
+
+ tokio::task::block_in_place(|| {
+ import_datadir::import_timeline_from_postgres_datadir(
+ unfinished_timeline,
+ pgdata_path,
+ pgdata_lsn,
+ )
+ })
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;
+ // Flush loop needs to be spawned in order for checkpoint to be able to flush.
+ // We want to run proper checkpoint before we mark timeline as available to outside world
+ // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
+ unfinished_timeline.maybe_spawn_flush_loop();
+
fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
+
unfinished_timeline
- .checkpoint(CheckpointConfig::Forced)
+ .checkpoint(CheckpointConfig::Forced).await
.with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?;
- let mut timelines = self.timelines.lock().unwrap();
- let timeline = raw_timeline.initialize_with_lock(&mut timelines, false)?;
- drop(timelines);
+ let timeline = {
+ let mut timelines = self.timelines.lock().unwrap();
+ raw_timeline.initialize_with_lock(&mut timelines, false)?
+ };
info!(
"created root timeline {} timeline.lsn {}",
@@ -1337,7 +1364,7 @@ impl Tenant {
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
- raw_timeline: Some((new_timeline, uninit_mark)),
+ raw_timeline: Some((Arc::new(new_timeline), uninit_mark)),
})
}
Err(e) => {
@@ -1456,7 +1483,7 @@ impl Tenant {
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
- raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
+ raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())),
};
match timeline.initialize_with_lock(&mut timelines_accessor, true) {
Ok(initialized_timeline) => {
@@ -1910,7 +1937,7 @@ mod tests {
Ok(())
}
- fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
+ async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1931,7 +1958,7 @@ mod tests {
writer.finish_write(lsn);
lsn += 0x10;
}
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
{
let writer = tline.writer();
writer.put(
@@ -1948,24 +1975,26 @@ mod tests {
)?;
writer.finish_write(lsn);
}
- tline.checkpoint(CheckpointConfig::Forced)
+ tline.checkpoint(CheckpointConfig::Forced).await
}
- #[test]
- fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
- make_some_layers(tline.as_ref(), Lsn(0x20))?;
+ make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
- tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
+ tenant
+ .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
+ .await?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
@@ -2010,14 +2039,14 @@ mod tests {
/*
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
- #[test]
- fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
- make_some_layers(tline.as_ref(), Lsn(0x20))?;
+ make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
@@ -2030,43 +2059,47 @@ mod tests {
}
*/
- #[test]
- fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
- make_some_layers(tline.as_ref(), Lsn(0x20))?;
+ make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
- tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
+ tenant
+ .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
+ .await?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
}
- #[test]
- fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
- make_some_layers(tline.as_ref(), Lsn(0x20))?;
+ make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
- make_some_layers(newtline.as_ref(), Lsn(0x60))?;
+ make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
// run gc on parent
- tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
+ tenant
+ .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
+ .await?;
// Check that the data is still accessible on the branch.
assert_eq!(
@@ -2077,8 +2110,8 @@ mod tests {
Ok(())
}
- #[test]
- fn timeline_load() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -2086,8 +2119,8 @@ mod tests {
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?
.initialize()?;
- make_some_layers(tline.as_ref(), Lsn(0x8000))?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
}
let tenant = harness.load();
@@ -2098,8 +2131,8 @@ mod tests {
Ok(())
}
- #[test]
- fn timeline_load_with_ancestor() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2109,8 +2142,8 @@ mod tests {
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
- make_some_layers(tline.as_ref(), Lsn(0x20))?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -2118,8 +2151,8 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
- make_some_layers(newtline.as_ref(), Lsn(0x60))?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
}
// check that both of them are initially unloaded
@@ -2179,8 +2212,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_images() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_images() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2191,7 +2224,7 @@ mod tests {
writer.finish_write(Lsn(0x10));
drop(writer);
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2199,7 +2232,7 @@ mod tests {
writer.finish_write(Lsn(0x20));
drop(writer);
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2207,7 +2240,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2215,7 +2248,7 @@ mod tests {
writer.finish_write(Lsn(0x40));
drop(writer);
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
@@ -2231,8 +2264,8 @@ mod tests {
// Insert 1000 key-value pairs with increasing keys, checkpoint,
// repeat 50 times.
//
- #[test]
- fn test_bulk_insert() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_bulk_insert() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2265,7 +2298,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}
@@ -2273,8 +2306,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_random_updates() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_random_updates() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2337,7 +2370,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}
@@ -2345,8 +2378,8 @@ mod tests {
Ok(())
}
- #[test]
- fn test_traverse_branches() -> anyhow::Result<()> {
+ #[tokio::test]
+ async fn test_traverse_branches() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2418,7 +2451,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
- tline.checkpoint(CheckpointConfig::Forced)?;
+ tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 279da70128..1f23fedcc1 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -16,7 +16,7 @@ use std::fs;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering};
-use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
+use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant, SystemTime};
use crate::tenant::{
@@ -121,8 +121,16 @@ pub struct Timeline {
/// to avoid deadlock.
write_lock: Mutex<()>,
- /// Used to ensure that there is only task performing flushing at a time
- layer_flush_lock: Mutex<()>,
+ /// Used to avoid multiple `flush_loop` tasks running
+ flush_loop_started: Mutex,
+
+ /// layer_flush_start_tx can be used to wake up the layer-flushing task.
+ /// The value is a counter, incremented every time a new flush cycle is requested.
+ /// The flush cycle counter is sent back on the layer_flush_done channel when
+ /// the flush finishes. You can use that to wait for the flush to finish.
+ layer_flush_start_tx: tokio::sync::watch::Sender,
+ /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
+ layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
@@ -466,15 +474,16 @@ impl Timeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
- pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
+ #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
+ pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
- self.flush_frozen_layers(true)
+ self.flush_frozen_layers_and_wait().await
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
- self.flush_frozen_layers(true)?;
+ self.flush_frozen_layers_and_wait().await?;
self.compact()
}
}
@@ -591,62 +600,6 @@ impl Timeline {
Ok(size)
}
- /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
- /// the in-memory layer, and initiate flushing it if so.
- ///
- /// Also flush after a period of time without new data -- it helps
- /// safekeepers to regard pageserver as caught up and suspend activity.
- pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> {
- let last_lsn = self.get_last_record_lsn();
- let layers = self.layers.read().unwrap();
- if let Some(open_layer) = &layers.open_layer {
- let open_layer_size = open_layer.size()?;
- drop(layers);
- let last_freeze_at = self.last_freeze_at.load();
- let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
- let distance = last_lsn.widening_sub(last_freeze_at);
- // Checkpointing the open layer can be triggered by layer size or LSN range.
- // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
- // we want to stay below that with a big margin. The LSN distance determines how
- // much WAL the safekeepers need to store.
- if distance >= self.get_checkpoint_distance().into()
- || open_layer_size > self.get_checkpoint_distance()
- || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
- {
- info!(
- "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
- distance,
- open_layer_size,
- last_freeze_ts.elapsed()
- );
-
- self.freeze_inmem_layer(true);
- self.last_freeze_at.store(last_lsn);
- *(self.last_freeze_ts.write().unwrap()) = Instant::now();
-
- // Launch a task to flush the frozen layer to disk, unless
- // a task was already running. (If the task was running
- // at the time that we froze the layer, it must've seen the
- // the layer we just froze before it exited; see comments
- // in flush_frozen_layers())
- if let Ok(guard) = self.layer_flush_lock.try_lock() {
- drop(guard);
- let self_clone = Arc::clone(self);
- task_mgr::spawn(
- task_mgr::BACKGROUND_RUNTIME.handle(),
- task_mgr::TaskKind::LayerFlushTask,
- Some(self.tenant_id),
- Some(self.timeline_id),
- "layer flush task",
- false,
- async move { self_clone.flush_frozen_layers(false) },
- );
- }
- }
- }
- Ok(())
- }
-
pub fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
@@ -732,6 +685,9 @@ impl Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
+ let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
+ let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
+
let mut result = Timeline {
conf,
tenant_conf,
@@ -759,8 +715,12 @@ impl Timeline {
upload_layers: AtomicBool::new(upload_layers),
+ flush_loop_started: Mutex::new(false),
+
+ layer_flush_start_tx,
+ layer_flush_done_tx,
+
write_lock: Mutex::new(()),
- layer_flush_lock: Mutex::new(()),
layer_removal_cs: Mutex::new(()),
gc_info: RwLock::new(GcInfo {
@@ -793,6 +753,33 @@ impl Timeline {
result
}
+ pub(super) fn maybe_spawn_flush_loop(self: &Arc) {
+ let mut flush_loop_started = self.flush_loop_started.lock().unwrap();
+ if *flush_loop_started {
+ info!(
+ "skipping attempt to start flush_loop twice {}/{}",
+ self.tenant_id, self.timeline_id
+ );
+ return;
+ }
+
+ let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
+ let self_clone = Arc::clone(self);
+ info!("spawning flush loop");
+ task_mgr::spawn(
+ task_mgr::BACKGROUND_RUNTIME.handle(),
+ task_mgr::TaskKind::LayerFlushTask,
+ Some(self.tenant_id),
+ Some(self.timeline_id),
+ "layer flush task",
+ false,
+ async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) }
+ .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
+ );
+
+ *flush_loop_started = true;
+ }
+
pub(super) fn launch_wal_receiver(self: &Arc) {
if !is_etcd_client_initialized() {
if cfg!(test) {
@@ -1289,53 +1276,128 @@ impl Timeline {
drop(layers);
}
- /// Flush all frozen layers to disk.
///
- /// Only one task at a time can be doing layer-flushing for a
- /// given timeline. If 'wait' is true, and another task is
- /// currently doing the flushing, this function will wait for it
- /// to finish. If 'wait' is false, this function will return
- /// immediately instead.
- fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
- let flush_lock_guard = if wait {
- self.layer_flush_lock.lock().unwrap()
- } else {
- match self.layer_flush_lock.try_lock() {
- Ok(guard) => guard,
- Err(TryLockError::WouldBlock) => return Ok(()),
- Err(TryLockError::Poisoned(err)) => panic!("{:?}", err),
- }
- };
+ /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
+ /// the in-memory layer, and initiate flushing it if so.
+ ///
+ /// Also flush after a period of time without new data -- it helps
+ /// safekeepers to regard pageserver as caught up and suspend activity.
+ ///
+ pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> {
+ let last_lsn = self.get_last_record_lsn();
+ let layers = self.layers.read().unwrap();
+ if let Some(open_layer) = &layers.open_layer {
+ let open_layer_size = open_layer.size()?;
+ drop(layers);
+ let last_freeze_at = self.last_freeze_at.load();
+ let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
+ let distance = last_lsn.widening_sub(last_freeze_at);
+ // Checkpointing the open layer can be triggered by layer size or LSN range.
+ // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
+ // we want to stay below that with a big margin. The LSN distance determines how
+ // much WAL the safekeepers need to store.
+ if distance >= self.get_checkpoint_distance().into()
+ || open_layer_size > self.get_checkpoint_distance()
+ || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
+ {
+ info!(
+ "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
+ distance,
+ open_layer_size,
+ last_freeze_ts.elapsed()
+ );
- let timer = self.metrics.flush_time_histo.start_timer();
+ self.freeze_inmem_layer(true);
+ self.last_freeze_at.store(last_lsn);
+ *(self.last_freeze_ts.write().unwrap()) = Instant::now();
- loop {
- let layers = self.layers.read().unwrap();
- if let Some(frozen_layer) = layers.frozen_layers.front() {
- let frozen_layer = Arc::clone(frozen_layer);
- drop(layers); // to allow concurrent reads and writes
- self.flush_frozen_layer(frozen_layer)?;
- } else {
- // Drop the 'layer_flush_lock' *before* 'layers'. That
- // way, if you freeze a layer, and then call
- // flush_frozen_layers(false), it is guaranteed that
- // if another thread was busy flushing layers and the
- // call therefore returns immediately, the other
- // thread will have seen the newly-frozen layer and
- // will flush that too (assuming no errors).
- drop(flush_lock_guard);
- drop(layers);
- break;
+ // Wake up the layer flusher
+ self.flush_frozen_layers();
}
}
-
- timer.stop_and_record();
-
Ok(())
}
+ /// Layer flusher task's main loop.
+ async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver) {
+ info!("started flush loop");
+ loop {
+ tokio::select! {
+ _ = task_mgr::shutdown_watcher() => {
+ info!("shutting down layer flush task");
+ break;
+ },
+ _ = layer_flush_start_rx.changed() => {}
+ }
+
+ trace!("waking up");
+ let timer = self.metrics.flush_time_histo.start_timer();
+ let flush_counter = *layer_flush_start_rx.borrow();
+ let result = loop {
+ let layer_to_flush = {
+ let layers = self.layers.read().unwrap();
+ layers.frozen_layers.front().cloned()
+ // drop 'layers' lock to allow concurrent reads and writes
+ };
+ if let Some(layer_to_flush) = layer_to_flush {
+ if let Err(err) = self.flush_frozen_layer(layer_to_flush).await {
+ error!("could not flush frozen layer: {err:?}");
+ break Err(err);
+ }
+ continue;
+ } else {
+ break Ok(());
+ }
+ };
+ // Notify any listeners that we're done
+ let _ = self
+ .layer_flush_done_tx
+ .send_replace((flush_counter, result));
+
+ timer.stop_and_record();
+ }
+ }
+
+ async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
+ let mut rx = self.layer_flush_done_tx.subscribe();
+
+ // Increment the flush cycle counter and wake up the flush task.
+ // Remember the new value, so that when we listen for the flush
+ // to finish, we know when the flush that we initiated has
+ // finished, instead of some other flush that was started earlier.
+ let mut my_flush_request = 0;
+ self.layer_flush_start_tx.send_modify(|counter| {
+ my_flush_request = *counter + 1;
+ *counter = my_flush_request;
+ });
+
+ loop {
+ {
+ let (last_result_counter, last_result) = &*rx.borrow();
+ if *last_result_counter >= my_flush_request {
+ if let Err(_err) = last_result {
+ // We already logged the original error in
+ // flush_loop. We cannot propagate it to the caller
+ // here, because it might not be Cloneable
+ bail!("could not flush frozen layer");
+ } else {
+ return Ok(());
+ }
+ }
+ }
+ trace!("waiting for flush to complete");
+ rx.changed().await?;
+ trace!("done")
+ }
+ }
+
+ fn flush_frozen_layers(&self) {
+ self.layer_flush_start_tx.send_modify(|val| *val += 1);
+ }
+
/// Flush one frozen in-memory layer to disk, as a new delta layer.
- fn flush_frozen_layer(&self, frozen_layer: Arc) -> anyhow::Result<()> {
+ #[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.filename().display()))]
+ async fn flush_frozen_layer(&self, frozen_layer: Arc) -> anyhow::Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -2265,13 +2327,10 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
- let img = self.walredo_mgr.request_redo(
- key,
- request_lsn,
- base_img,
- data.records,
- self.pg_version,
- )?;
+ let img = self
+ .walredo_mgr
+ .request_redo(key, request_lsn, base_img, data.records, self.pg_version)
+ .context("Failed to reconstruct a page image:")?;
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs
index f1db50bf7f..3766bc5cb3 100644
--- a/pageserver/src/tenant_mgr.rs
+++ b/pageserver/src/tenant_mgr.rs
@@ -241,7 +241,7 @@ pub async fn shutdown_all_tenants() {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
- if let Err(err) = tenant.checkpoint() {
+ if let Err(err) = tenant.checkpoint().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs
index 23ce9dc699..a24bdd5812 100644
--- a/pageserver/src/tenant_tasks.rs
+++ b/pageserver/src/tenant_tasks.rs
@@ -119,7 +119,7 @@ async fn gc_loop(tenant_id: TenantId) {
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if gc_horizon > 0 {
- if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
+ if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration);
diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs
index e21ec4d742..54d322373b 100644
--- a/pageserver/src/walredo.rs
+++ b/pageserver/src/walredo.rs
@@ -120,7 +120,7 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
- #[error(transparent)]
+ #[error("encountered io error: {0}")]
IoError(#[from] std::io::Error),
#[error("cannot perform WAL redo now")]