diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 68a26b8098..937a6144b6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -31,8 +31,6 @@ use utils::{ // Imports only used for testing APIs #[cfg(feature = "testing")] use super::models::{ConfigureFailpointsRequest, TimelineGcRequest}; -#[cfg(feature = "testing")] -use crate::CheckpointConfig; struct State { conf: &'static PageServerConf, @@ -777,7 +775,11 @@ async fn timeline_checkpoint_handler(request: Request) -> Result>>, // This mutex prevents creation of new timelines during GC. // Adding yet another mutex (in addition to `timelines`) is needed because holding - // `timelines` mutex during all GC iteration (especially with enforced checkpoint) + // `timelines` mutex during all GC iteration // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn // timeout... @@ -249,7 +249,7 @@ impl UninitializedTimeline<'_> { .context("Failed to import basebackup") })?; - // Flush loop needs to be spawned in order for checkpoint to be able to flush. + // Flush loop needs to be spawned in order to be able to flush. // We want to run proper checkpoint before we mark timeline as available to outside world // Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock raw_timeline.maybe_spawn_flush_loop(); @@ -259,9 +259,9 @@ impl UninitializedTimeline<'_> { }); raw_timeline - .checkpoint(CheckpointConfig::Flush) + .freeze_and_flush() .await - .context("Failed to checkpoint after basebackup import")?; + .context("Failed to flush after basebackup import")?; let timeline = self.initialize()?; @@ -371,7 +371,7 @@ impl Drop for TimelineUninitMark { // We should not blindly overwrite local metadata with remote one. // For example, consider the following case: -// Checkpoint comes, we update local metadata and start upload task but after that +// Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that // pageserver crashes. During startup we'll load new metadata, and then reset it // to the state of remote one. But current layermap will have layers from the old // metadata which is inconsistent. @@ -1225,24 +1225,21 @@ impl Tenant { /// /// Used at graceful shutdown. /// - pub async fn checkpoint(&self) -> anyhow::Result<()> { + pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the - // checkpoints. We don't want to block everything else while the - // checkpoint runs. - let timelines_to_checkpoint = { + // flushing. We don't want to block everything else while the + // flushing is performed. + let timelines_to_flush = { let timelines = self.timelines.lock().unwrap(); timelines .iter() - .map(|(id, timeline)| (*id, Arc::clone(timeline))) + .map(|(_id, timeline)| Arc::clone(timeline)) .collect::>() }; - for (id, timeline) in &timelines_to_checkpoint { - timeline - .checkpoint(CheckpointConfig::Flush) - .instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id)) - .await?; + for timeline in &timelines_to_flush { + timeline.freeze_and_flush().await?; } Ok(()) @@ -2095,8 +2092,13 @@ impl Tenant { }); unfinished_timeline - .checkpoint(CheckpointConfig::Flush).await - .with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?; + .freeze_and_flush() + .await + .with_context(|| { + format!( + "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}" + ) + })?; let timeline = { let mut timelines = self.timelines.lock().unwrap(); @@ -2831,7 +2833,7 @@ mod tests { writer.finish_write(lsn); lsn += 0x10; } - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; { let writer = tline.writer(); writer.put( @@ -2848,7 +2850,7 @@ mod tests { )?; writer.finish_write(lsn); } - tline.checkpoint(CheckpointConfig::Forced).await + tline.freeze_and_flush().await } #[tokio::test] @@ -2863,7 +2865,7 @@ mod tests { make_some_layers(tline.as_ref(), Lsn(0x20)).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 - // FIXME: this doesn't actually remove any layer currently, given how the checkpointing + // FIXME: this doesn't actually remove any layer currently, given how the flushing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. tenant @@ -3098,7 +3100,7 @@ mod tests { writer.finish_write(Lsn(0x10)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; let writer = tline.writer(); @@ -3106,7 +3108,7 @@ mod tests { writer.finish_write(Lsn(0x20)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; let writer = tline.writer(); @@ -3114,7 +3116,7 @@ mod tests { writer.finish_write(Lsn(0x30)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; let writer = tline.writer(); @@ -3122,7 +3124,7 @@ mod tests { writer.finish_write(Lsn(0x40)); drop(writer); - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10")); @@ -3135,8 +3137,8 @@ mod tests { } // - // Insert 1000 key-value pairs with increasing keys, checkpoint, - // repeat 50 times. + // Insert 1000 key-value pairs with increasing keys, flush, compact, GC. + // Repeat 50 times. // #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { @@ -3172,7 +3174,7 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; } @@ -3240,11 +3242,10 @@ mod tests { ); } - // Perform a cycle of checkpoint, compaction, and GC - println!("checkpointing {}", lsn); + // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; } @@ -3323,11 +3324,10 @@ mod tests { ); } - // Perform a cycle of checkpoint, compaction, and GC - println!("checkpointing {}", lsn); + // Perform a cycle of flush, compact, and GC let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; - tline.checkpoint(CheckpointConfig::Forced).await?; + tline.freeze_and_flush().await?; tline.compact().await?; tline.gc().await?; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b1f580c32f..0697ec4bd6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -57,7 +57,6 @@ use crate::repository::{Key, Value}; use crate::task_mgr::TaskKind; use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task}; use crate::walredo::WalRedoManager; -use crate::CheckpointConfig; use crate::METADATA_FILE_NAME; use crate::ZERO_PAGE; use crate::{is_temporary, task_mgr}; @@ -499,22 +498,10 @@ impl Timeline { } /// Flush to disk all data that was written with the put_* functions - /// - /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't - /// know anything about them here in the repository. #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))] - pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { - match cconf { - CheckpointConfig::Flush => { - self.freeze_inmem_layer(false); - self.flush_frozen_layers_and_wait().await - } - CheckpointConfig::Forced => { - self.freeze_inmem_layer(false); - self.flush_frozen_layers_and_wait().await?; - self.compact().await - } - } + pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { + self.freeze_inmem_layer(false); + self.flush_frozen_layers_and_wait().await } pub async fn compact(&self) -> anyhow::Result<()> { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 615dcce4a1..85be420cb8 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -196,7 +196,7 @@ pub async fn shutdown_all_tenants() { let tenant_id = tenant.tenant_id(); debug!("shutdown tenant {tenant_id}"); - if let Err(err) = tenant.checkpoint().await { + if let Err(err) = tenant.freeze_and_flush().await { error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } }