From 7b7f84f1b4643f0417b6ccc1793cc0c38aeb55db Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 15 Oct 2022 02:08:18 +0300 Subject: [PATCH] Refactor layer flushing task Extracted from https://github.com/neondatabase/neon/pull/2595 --- pageserver/src/http/routes.rs | 5 +- pageserver/src/page_service.rs | 17 +- pageserver/src/tenant.rs | 239 +++++++++++++++----------- pageserver/src/tenant/timeline.rs | 275 ++++++++++++++++++------------ pageserver/src/tenant_mgr.rs | 2 +- pageserver/src/tenant_tasks.rs | 2 +- pageserver/src/walredo.rs | 2 +- 7 files changed, 317 insertions(+), 225 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7087c68dbd..14ea054577 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -825,14 +825,14 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result { owning_tenant: &'t Tenant, timeline_id: TimelineId, - raw_timeline: Option<(Timeline, TimelineUninitMark)>, + raw_timeline: Option<(Arc, TimelineUninitMark)>, } /// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory, @@ -169,7 +174,6 @@ impl UninitializedTimeline<'_> { let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| { format!("No timeline for initalization found for {tenant_id}/{timeline_id}") })?; - let new_timeline = Arc::new(new_timeline); let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn(); // TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least @@ -197,6 +201,9 @@ impl UninitializedTimeline<'_> { })?; new_timeline.set_state(TimelineState::Active); v.insert(Arc::clone(&new_timeline)); + + new_timeline.maybe_spawn_flush_loop(); + new_timeline.launch_wal_receiver(); } } @@ -205,20 +212,28 @@ impl UninitializedTimeline<'_> { } /// Prepares timeline data by loading it from the basebackup archive. - pub fn import_basebackup_from_tar( - &self, - reader: impl std::io::Read, + pub async fn import_basebackup_from_tar( + self, + mut copyin_stream: &mut Pin<&mut impl Stream>>, base_lsn: Lsn, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let raw_timeline = self.raw_timeline()?; - import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn).with_context( - || { - format!( - "Failed to import basebackup for timeline {}/{}", - self.owning_tenant.tenant_id, self.timeline_id - ) - }, - )?; + + // import_basebackup_from_tar() is not async, mainly because the Tar crate + // it uses is not async. So we need to jump through some hoops: + // - convert the input from client connection to a synchronous Read + // - use block_in_place() + let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream)); + + tokio::task::block_in_place(|| { + import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn) + .context("Failed to import basebackup") + })?; + + // Flush loop needs to be spawned in order for checkpoint to be able to flush. + // We want to run proper checkpoint before we mark timeline as available to outside world + // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock + raw_timeline.maybe_spawn_flush_loop(); fail::fail_point!("before-checkpoint-new-timeline", |_| { bail!("failpoint before-checkpoint-new-timeline"); @@ -226,16 +241,15 @@ impl UninitializedTimeline<'_> { raw_timeline .checkpoint(CheckpointConfig::Flush) - .with_context(|| { - format!( - "Failed to checkpoint after basebackup import for timeline {}/{}", - self.owning_tenant.tenant_id, self.timeline_id - ) - })?; - Ok(()) + .await + .context("Failed to checkpoint after basebackup import")?; + + let timeline = self.initialize()?; + + Ok(timeline) } - fn raw_timeline(&self) -> anyhow::Result<&Timeline> { + fn raw_timeline(&self) -> anyhow::Result<&Arc> { Ok(&self .raw_timeline .as_ref() @@ -470,7 +484,7 @@ impl Tenant { self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? } - None => self.bootstrap_timeline(new_timeline_id, pg_version)?, + None => self.bootstrap_timeline(new_timeline_id, pg_version).await?, }; // Have added new timeline into the tenant, now its background tasks are needed. @@ -488,7 +502,7 @@ impl Tenant { /// `checkpoint_before_gc` parameter is used to force compaction of storage before GC /// to make tests more deterministic. /// TODO Do we still need it or we can call checkpoint explicitly in tests where needed? - pub fn gc_iteration( + pub async fn gc_iteration( &self, target_timeline_id: Option, horizon: u64, @@ -504,11 +518,13 @@ impl Tenant { .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); - STORAGE_TIME - .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str]) - .observe_closure_duration(|| { - self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc) - }) + { + let _timer = STORAGE_TIME + .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str]) + .start_timer(); + self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc) + .await + } } /// Perform one compaction iteration. @@ -544,23 +560,24 @@ impl Tenant { /// /// Used at graceful shutdown. /// - pub fn checkpoint(&self) -> anyhow::Result<()> { + pub async fn checkpoint(&self) -> anyhow::Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // checkpoints. We don't want to block everything else while the // checkpoint runs. - let timelines = self.timelines.lock().unwrap(); - let timelines_to_checkpoint = timelines - .iter() - .map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline))) - .collect::>(); - drop(timelines); + let timelines_to_checkpoint = { + let timelines = self.timelines.lock().unwrap(); + timelines + .iter() + .map(|(id, timeline)| (*id, Arc::clone(timeline))) + .collect::>() + }; - for (timeline_id, timeline) in &timelines_to_checkpoint { - let _entered = - info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id) - .entered(); - timeline.checkpoint(CheckpointConfig::Flush)?; + for (id, timeline) in &timelines_to_checkpoint { + timeline + .checkpoint(CheckpointConfig::Flush) + .instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id)) + .await?; } Ok(()) @@ -974,7 +991,7 @@ impl Tenant { // - if a relation has a non-incremental persistent layer on a child branch, then we // don't need to keep that in the parent anymore. But currently // we do. - fn gc_iteration_internal( + async fn gc_iteration_internal( &self, target_timeline_id: Option, horizon: u64, @@ -1007,7 +1024,7 @@ impl Tenant { // so that they too can be garbage collected. That's // used in tests, so we want as deterministic results as possible. if checkpoint_before_gc { - timeline.checkpoint(CheckpointConfig::Forced)?; + timeline.checkpoint(CheckpointConfig::Forced).await?; info!( "timeline {} checkpoint_before_gc done", timeline.timeline_id @@ -1117,7 +1134,6 @@ impl Tenant { } } drop(gc_cs); - Ok(gc_timelines) } @@ -1222,14 +1238,15 @@ impl Tenant { /// - run initdb to init temporary instance and get bootstrap data /// - after initialization complete, remove the temp dir. - fn bootstrap_timeline( + async fn bootstrap_timeline( &self, timeline_id: TimelineId, pg_version: u32, ) -> anyhow::Result> { - let timelines = self.timelines.lock().unwrap(); - let timeline_uninit_mark = self.create_timeline_uninit_mark(timeline_id, &timelines)?; - drop(timelines); + let timeline_uninit_mark = { + let timelines = self.timelines.lock().unwrap(); + self.create_timeline_uninit_mark(timeline_id, &timelines)? + }; // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. let initdb_path = path_with_suffix_extension( @@ -1279,25 +1296,35 @@ impl Tenant { let tenant_id = raw_timeline.owning_tenant.tenant_id; let unfinished_timeline = raw_timeline.raw_timeline()?; - import_datadir::import_timeline_from_postgres_datadir( - unfinished_timeline, - pgdata_path, - pgdata_lsn, - ) + + tokio::task::block_in_place(|| { + import_datadir::import_timeline_from_postgres_datadir( + unfinished_timeline, + pgdata_path, + pgdata_lsn, + ) + }) .with_context(|| { format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}") })?; + // Flush loop needs to be spawned in order for checkpoint to be able to flush. + // We want to run proper checkpoint before we mark timeline as available to outside world + // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock + unfinished_timeline.maybe_spawn_flush_loop(); + fail::fail_point!("before-checkpoint-new-timeline", |_| { anyhow::bail!("failpoint before-checkpoint-new-timeline"); }); + unfinished_timeline - .checkpoint(CheckpointConfig::Forced) + .checkpoint(CheckpointConfig::Forced).await .with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?; - let mut timelines = self.timelines.lock().unwrap(); - let timeline = raw_timeline.initialize_with_lock(&mut timelines, false)?; - drop(timelines); + let timeline = { + let mut timelines = self.timelines.lock().unwrap(); + raw_timeline.initialize_with_lock(&mut timelines, false)? + }; info!( "created root timeline {} timeline.lsn {}", @@ -1337,7 +1364,7 @@ impl Tenant { Ok(UninitializedTimeline { owning_tenant: self, timeline_id: new_timeline_id, - raw_timeline: Some((new_timeline, uninit_mark)), + raw_timeline: Some((Arc::new(new_timeline), uninit_mark)), }) } Err(e) => { @@ -1456,7 +1483,7 @@ impl Tenant { let timeline = UninitializedTimeline { owning_tenant: self, timeline_id, - raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())), + raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())), }; match timeline.initialize_with_lock(&mut timelines_accessor, true) { Ok(initialized_timeline) => { @@ -1910,7 +1937,7 @@ mod tests { Ok(()) } - fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> { + async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> { let mut lsn = start_lsn; #[allow(non_snake_case)] { @@ -1931,7 +1958,7 @@ mod tests { writer.finish_write(lsn); lsn += 0x10; } - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; { let writer = tline.writer(); writer.put( @@ -1948,24 +1975,26 @@ mod tests { )?; writer.finish_write(lsn); } - tline.checkpoint(CheckpointConfig::Forced) + tline.checkpoint(CheckpointConfig::Forced).await } - #[test] - fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { + #[tokio::test] + async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; - make_some_layers(tline.as_ref(), Lsn(0x20))?; + make_some_layers(tline.as_ref(), Lsn(0x20)).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 // FIXME: this doesn't actually remove any layer currently, given how the checkpointing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. - tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant + .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false) + .await?; // try to branch at lsn 25, should fail because we already garbage collected the data match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { @@ -2010,14 +2039,14 @@ mod tests { /* // FIXME: This currently fails to error out. Calling GC doesn't currently // remove the old value, we'd need to work a little harder - #[test] - fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> { + #[tokio::test] + async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> { let repo = RepoHarness::create("test_prohibit_get_for_garbage_collected_data")? .load(); let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; - make_some_layers(tline.as_ref(), Lsn(0x20))?; + make_some_layers(tline.as_ref(), Lsn(0x20)).await?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); @@ -2030,43 +2059,47 @@ mod tests { } */ - #[test] - fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { + #[tokio::test] + async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; - make_some_layers(tline.as_ref(), Lsn(0x20))?; + make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 - tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant + .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false) + .await?; assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok()); Ok(()) } - #[test] - fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { + #[tokio::test] + async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; - make_some_layers(tline.as_ref(), Lsn(0x20))?; + make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - make_some_layers(newtline.as_ref(), Lsn(0x60))?; + make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; // run gc on parent - tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant + .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false) + .await?; // Check that the data is still accessible on the branch. assert_eq!( @@ -2077,8 +2110,8 @@ mod tests { Ok(()) } - #[test] - fn timeline_load() -> anyhow::Result<()> { + #[tokio::test] + async fn timeline_load() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load"; let harness = TenantHarness::create(TEST_NAME)?; { @@ -2086,8 +2119,8 @@ mod tests { let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)? .initialize()?; - make_some_layers(tline.as_ref(), Lsn(0x8000))?; - tline.checkpoint(CheckpointConfig::Forced)?; + make_some_layers(tline.as_ref(), Lsn(0x8000)).await?; + tline.checkpoint(CheckpointConfig::Forced).await?; } let tenant = harness.load(); @@ -2098,8 +2131,8 @@ mod tests { Ok(()) } - #[test] - fn timeline_load_with_ancestor() -> anyhow::Result<()> { + #[tokio::test] + async fn timeline_load_with_ancestor() -> anyhow::Result<()> { const TEST_NAME: &str = "timeline_load_with_ancestor"; let harness = TenantHarness::create(TEST_NAME)?; // create two timelines @@ -2109,8 +2142,8 @@ mod tests { .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? .initialize()?; - make_some_layers(tline.as_ref(), Lsn(0x20))?; - tline.checkpoint(CheckpointConfig::Forced)?; + make_some_layers(tline.as_ref(), Lsn(0x20)).await?; + tline.checkpoint(CheckpointConfig::Forced).await?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; @@ -2118,8 +2151,8 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - make_some_layers(newtline.as_ref(), Lsn(0x60))?; - tline.checkpoint(CheckpointConfig::Forced)?; + make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; + tline.checkpoint(CheckpointConfig::Forced).await?; } // check that both of them are initially unloaded @@ -2179,8 +2212,8 @@ mod tests { Ok(()) } - #[test] - fn test_images() -> anyhow::Result<()> { + #[tokio::test] + async fn test_images() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_images")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2191,7 +2224,7 @@ mod tests { writer.finish_write(Lsn(0x10)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; let writer = tline.writer(); @@ -2199,7 +2232,7 @@ mod tests { writer.finish_write(Lsn(0x20)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; let writer = tline.writer(); @@ -2207,7 +2240,7 @@ mod tests { writer.finish_write(Lsn(0x30)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; let writer = tline.writer(); @@ -2215,7 +2248,7 @@ mod tests { writer.finish_write(Lsn(0x40)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10")); @@ -2231,8 +2264,8 @@ mod tests { // Insert 1000 key-value pairs with increasing keys, checkpoint, // repeat 50 times. // - #[test] - fn test_bulk_insert() -> anyhow::Result<()> { + #[tokio::test] + async fn test_bulk_insert() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_bulk_insert")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2265,7 +2298,7 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; tline.gc()?; } @@ -2273,8 +2306,8 @@ mod tests { Ok(()) } - #[test] - fn test_random_updates() -> anyhow::Result<()> { + #[tokio::test] + async fn test_random_updates() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_random_updates")?.load(); let tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2337,7 +2370,7 @@ mod tests { println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; tline.gc()?; } @@ -2345,8 +2378,8 @@ mod tests { Ok(()) } - #[test] - fn test_traverse_branches() -> anyhow::Result<()> { + #[tokio::test] + async fn test_traverse_branches() -> anyhow::Result<()> { let tenant = TenantHarness::create("test_traverse_branches")?.load(); let mut tline = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)? @@ -2418,7 +2451,7 @@ mod tests { println!("checkpointing {}", lsn); let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced)?; + tline.checkpoint(CheckpointConfig::Forced).await?; tline.compact()?; tline.gc()?; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 279da70128..1f23fedcc1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -16,7 +16,7 @@ use std::fs; use std::ops::{Deref, Range}; use std::path::PathBuf; use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::{Duration, Instant, SystemTime}; use crate::tenant::{ @@ -121,8 +121,16 @@ pub struct Timeline { /// to avoid deadlock. write_lock: Mutex<()>, - /// Used to ensure that there is only task performing flushing at a time - layer_flush_lock: Mutex<()>, + /// Used to avoid multiple `flush_loop` tasks running + flush_loop_started: Mutex, + + /// layer_flush_start_tx can be used to wake up the layer-flushing task. + /// The value is a counter, incremented every time a new flush cycle is requested. + /// The flush cycle counter is sent back on the layer_flush_done channel when + /// the flush finishes. You can use that to wait for the flush to finish. + layer_flush_start_tx: tokio::sync::watch::Sender, + /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel + layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>, /// Layer removal lock. /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. @@ -466,15 +474,16 @@ impl Timeline { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { + #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))] + pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { match cconf { CheckpointConfig::Flush => { self.freeze_inmem_layer(false); - self.flush_frozen_layers(true) + self.flush_frozen_layers_and_wait().await } CheckpointConfig::Forced => { self.freeze_inmem_layer(false); - self.flush_frozen_layers(true)?; + self.flush_frozen_layers_and_wait().await?; self.compact() } } @@ -591,62 +600,6 @@ impl Timeline { Ok(size) } - /// Check if more than 'checkpoint_distance' of WAL has been accumulated in - /// the in-memory layer, and initiate flushing it if so. - /// - /// Also flush after a period of time without new data -- it helps - /// safekeepers to regard pageserver as caught up and suspend activity. - pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { - let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); - if let Some(open_layer) = &layers.open_layer { - let open_layer_size = open_layer.size()?; - drop(layers); - let last_freeze_at = self.last_freeze_at.load(); - let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); - let distance = last_lsn.widening_sub(last_freeze_at); - // Checkpointing the open layer can be triggered by layer size or LSN range. - // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and - // we want to stay below that with a big margin. The LSN distance determines how - // much WAL the safekeepers need to store. - if distance >= self.get_checkpoint_distance().into() - || open_layer_size > self.get_checkpoint_distance() - || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) - { - info!( - "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", - distance, - open_layer_size, - last_freeze_ts.elapsed() - ); - - self.freeze_inmem_layer(true); - self.last_freeze_at.store(last_lsn); - *(self.last_freeze_ts.write().unwrap()) = Instant::now(); - - // Launch a task to flush the frozen layer to disk, unless - // a task was already running. (If the task was running - // at the time that we froze the layer, it must've seen the - // the layer we just froze before it exited; see comments - // in flush_frozen_layers()) - if let Ok(guard) = self.layer_flush_lock.try_lock() { - drop(guard); - let self_clone = Arc::clone(self); - task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), - task_mgr::TaskKind::LayerFlushTask, - Some(self.tenant_id), - Some(self.timeline_id), - "layer flush task", - false, - async move { self_clone.flush_frozen_layers(false) }, - ); - } - } - } - Ok(()) - } - pub fn set_state(&self, new_state: TimelineState) { match (self.current_state(), new_state) { (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { @@ -732,6 +685,9 @@ impl Timeline { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(TimelineState::Suspended); + let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0); + let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(()))); + let mut result = Timeline { conf, tenant_conf, @@ -759,8 +715,12 @@ impl Timeline { upload_layers: AtomicBool::new(upload_layers), + flush_loop_started: Mutex::new(false), + + layer_flush_start_tx, + layer_flush_done_tx, + write_lock: Mutex::new(()), - layer_flush_lock: Mutex::new(()), layer_removal_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { @@ -793,6 +753,33 @@ impl Timeline { result } + pub(super) fn maybe_spawn_flush_loop(self: &Arc) { + let mut flush_loop_started = self.flush_loop_started.lock().unwrap(); + if *flush_loop_started { + info!( + "skipping attempt to start flush_loop twice {}/{}", + self.tenant_id, self.timeline_id + ); + return; + } + + let layer_flush_start_rx = self.layer_flush_start_tx.subscribe(); + let self_clone = Arc::clone(self); + info!("spawning flush loop"); + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + task_mgr::TaskKind::LayerFlushTask, + Some(self.tenant_id), + Some(self.timeline_id), + "layer flush task", + false, + async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) } + .instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id)) + ); + + *flush_loop_started = true; + } + pub(super) fn launch_wal_receiver(self: &Arc) { if !is_etcd_client_initialized() { if cfg!(test) { @@ -1289,53 +1276,128 @@ impl Timeline { drop(layers); } - /// Flush all frozen layers to disk. /// - /// Only one task at a time can be doing layer-flushing for a - /// given timeline. If 'wait' is true, and another task is - /// currently doing the flushing, this function will wait for it - /// to finish. If 'wait' is false, this function will return - /// immediately instead. - fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> { - let flush_lock_guard = if wait { - self.layer_flush_lock.lock().unwrap() - } else { - match self.layer_flush_lock.try_lock() { - Ok(guard) => guard, - Err(TryLockError::WouldBlock) => return Ok(()), - Err(TryLockError::Poisoned(err)) => panic!("{:?}", err), - } - }; + /// Check if more than 'checkpoint_distance' of WAL has been accumulated in + /// the in-memory layer, and initiate flushing it if so. + /// + /// Also flush after a period of time without new data -- it helps + /// safekeepers to regard pageserver as caught up and suspend activity. + /// + pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + let last_lsn = self.get_last_record_lsn(); + let layers = self.layers.read().unwrap(); + if let Some(open_layer) = &layers.open_layer { + let open_layer_size = open_layer.size()?; + drop(layers); + let last_freeze_at = self.last_freeze_at.load(); + let last_freeze_ts = *(self.last_freeze_ts.read().unwrap()); + let distance = last_lsn.widening_sub(last_freeze_at); + // Checkpointing the open layer can be triggered by layer size or LSN range. + // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and + // we want to stay below that with a big margin. The LSN distance determines how + // much WAL the safekeepers need to store. + if distance >= self.get_checkpoint_distance().into() + || open_layer_size > self.get_checkpoint_distance() + || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()) + { + info!( + "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}", + distance, + open_layer_size, + last_freeze_ts.elapsed() + ); - let timer = self.metrics.flush_time_histo.start_timer(); + self.freeze_inmem_layer(true); + self.last_freeze_at.store(last_lsn); + *(self.last_freeze_ts.write().unwrap()) = Instant::now(); - loop { - let layers = self.layers.read().unwrap(); - if let Some(frozen_layer) = layers.frozen_layers.front() { - let frozen_layer = Arc::clone(frozen_layer); - drop(layers); // to allow concurrent reads and writes - self.flush_frozen_layer(frozen_layer)?; - } else { - // Drop the 'layer_flush_lock' *before* 'layers'. That - // way, if you freeze a layer, and then call - // flush_frozen_layers(false), it is guaranteed that - // if another thread was busy flushing layers and the - // call therefore returns immediately, the other - // thread will have seen the newly-frozen layer and - // will flush that too (assuming no errors). - drop(flush_lock_guard); - drop(layers); - break; + // Wake up the layer flusher + self.flush_frozen_layers(); } } - - timer.stop_and_record(); - Ok(()) } + /// Layer flusher task's main loop. + async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver) { + info!("started flush loop"); + loop { + tokio::select! { + _ = task_mgr::shutdown_watcher() => { + info!("shutting down layer flush task"); + break; + }, + _ = layer_flush_start_rx.changed() => {} + } + + trace!("waking up"); + let timer = self.metrics.flush_time_histo.start_timer(); + let flush_counter = *layer_flush_start_rx.borrow(); + let result = loop { + let layer_to_flush = { + let layers = self.layers.read().unwrap(); + layers.frozen_layers.front().cloned() + // drop 'layers' lock to allow concurrent reads and writes + }; + if let Some(layer_to_flush) = layer_to_flush { + if let Err(err) = self.flush_frozen_layer(layer_to_flush).await { + error!("could not flush frozen layer: {err:?}"); + break Err(err); + } + continue; + } else { + break Ok(()); + } + }; + // Notify any listeners that we're done + let _ = self + .layer_flush_done_tx + .send_replace((flush_counter, result)); + + timer.stop_and_record(); + } + } + + async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> { + let mut rx = self.layer_flush_done_tx.subscribe(); + + // Increment the flush cycle counter and wake up the flush task. + // Remember the new value, so that when we listen for the flush + // to finish, we know when the flush that we initiated has + // finished, instead of some other flush that was started earlier. + let mut my_flush_request = 0; + self.layer_flush_start_tx.send_modify(|counter| { + my_flush_request = *counter + 1; + *counter = my_flush_request; + }); + + loop { + { + let (last_result_counter, last_result) = &*rx.borrow(); + if *last_result_counter >= my_flush_request { + if let Err(_err) = last_result { + // We already logged the original error in + // flush_loop. We cannot propagate it to the caller + // here, because it might not be Cloneable + bail!("could not flush frozen layer"); + } else { + return Ok(()); + } + } + } + trace!("waiting for flush to complete"); + rx.changed().await?; + trace!("done") + } + } + + fn flush_frozen_layers(&self) { + self.layer_flush_start_tx.send_modify(|val| *val += 1); + } + /// Flush one frozen in-memory layer to disk, as a new delta layer. - fn flush_frozen_layer(&self, frozen_layer: Arc) -> anyhow::Result<()> { + #[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.filename().display()))] + async fn flush_frozen_layer(&self, frozen_layer: Arc) -> anyhow::Result<()> { // As a special case, when we have just imported an image into the repository, // instead of writing out a L0 delta layer, we directly write out image layer // files instead. This is possible as long as *all* the data imported into the @@ -2265,13 +2327,10 @@ impl Timeline { let last_rec_lsn = data.records.last().unwrap().0; - let img = self.walredo_mgr.request_redo( - key, - request_lsn, - base_img, - data.records, - self.pg_version, - )?; + let img = self + .walredo_mgr + .request_redo(key, request_lsn, base_img, data.records, self.pg_version) + .context("Failed to reconstruct a page image:")?; if img.len() == page_cache::PAGE_SZ { let cache = page_cache::get(); diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index f1db50bf7f..3766bc5cb3 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -241,7 +241,7 @@ pub async fn shutdown_all_tenants() { let tenant_id = tenant.tenant_id(); debug!("shutdown tenant {tenant_id}"); - if let Err(err) = tenant.checkpoint() { + if let Err(err) = tenant.checkpoint().await { error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 23ce9dc699..a24bdd5812 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -119,7 +119,7 @@ async fn gc_loop(tenant_id: TenantId) { let gc_horizon = tenant.get_gc_horizon(); let mut sleep_duration = gc_period; if gc_horizon > 0 { - if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false) + if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await { sleep_duration = wait_duration; error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e21ec4d742..54d322373b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -120,7 +120,7 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { /// An error happened in WAL redo #[derive(Debug, thiserror::Error)] pub enum WalRedoError { - #[error(transparent)] + #[error("encountered io error: {0}")] IoError(#[from] std::io::Error), #[error("cannot perform WAL redo now")]