Replace Timeline::checkpoint with Timeline::freeze_and_flush

The new Timeline::freeze_and_flush function is equivalent to calling
Timeline::checkpoint(CheckpointConfig::Flush). There were only one
non-test caller that used CheckpointConfig::Forced, so replace that
with a call to the new Timeline::freeze_and_flush, followed by an
explicit call to Timeline::compact.

That only caller was to handle the mgmt API's 'checkpoint' endpoint.
Perhaps we should split that into separate 'flush' and 'compact'
endpoints too, but I didn't go that far yet.
This commit is contained in:
Heikki Linnakangas
2022-12-20 01:19:56 +02:00
committed by Heikki Linnakangas
parent 2c11f1fa95
commit eefb1d46f4
6 changed files with 44 additions and 65 deletions

View File

@@ -31,8 +31,6 @@ use utils::{
// Imports only used for testing APIs
#[cfg(feature = "testing")]
use super::models::{ConfigureFailpointsRequest, TimelineGcRequest};
#[cfg(feature = "testing")]
use crate::CheckpointConfig;
struct State {
conf: &'static PageServerConf,
@@ -777,7 +775,11 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
timeline
.checkpoint(CheckpointConfig::Forced)
.freeze_and_flush()
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact()
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -47,15 +47,6 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
/// Config for the Repository checkpointer
#[derive(Debug, Clone, Copy)]
pub enum CheckpointConfig {
// Flush all in-memory data
Flush,
// Flush all in-memory data and reconstruct all page images
Forced,
}
pub async fn shutdown_pageserver(exit_code: i32) {
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.

View File

@@ -51,7 +51,6 @@ use crate::task_mgr::TaskKind;
use crate::tenant::{Tenant, Timeline};
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -466,7 +465,7 @@ impl PageServerHandler {
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.checkpoint(CheckpointConfig::Flush).await?;
timeline.freeze_and_flush().await?;
info!("done");
Ok(())

View File

@@ -62,7 +62,7 @@ use crate::tenant_config::TenantConfOpt;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::walredo::WalRedoManager;
use crate::{CheckpointConfig, TEMP_FILE_SUFFIX};
use crate::TEMP_FILE_SUFFIX;
pub use pageserver_api::models::TenantState;
use toml_edit;
@@ -125,7 +125,7 @@ pub struct Tenant {
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
// This mutex prevents creation of new timelines during GC.
// Adding yet another mutex (in addition to `timelines`) is needed because holding
// `timelines` mutex during all GC iteration (especially with enforced checkpoint)
// `timelines` mutex during all GC iteration
// may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
@@ -249,7 +249,7 @@ impl UninitializedTimeline<'_> {
.context("Failed to import basebackup")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// Flush loop needs to be spawned in order to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
raw_timeline.maybe_spawn_flush_loop();
@@ -259,9 +259,9 @@ impl UninitializedTimeline<'_> {
});
raw_timeline
.checkpoint(CheckpointConfig::Flush)
.freeze_and_flush()
.await
.context("Failed to checkpoint after basebackup import")?;
.context("Failed to flush after basebackup import")?;
let timeline = self.initialize()?;
@@ -371,7 +371,7 @@ impl Drop for TimelineUninitMark {
// We should not blindly overwrite local metadata with remote one.
// For example, consider the following case:
// Checkpoint comes, we update local metadata and start upload task but after that
// Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
// pageserver crashes. During startup we'll load new metadata, and then reset it
// to the state of remote one. But current layermap will have layers from the old
// metadata which is inconsistent.
@@ -1225,24 +1225,21 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub async fn checkpoint(&self) -> anyhow::Result<()> {
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines_to_checkpoint = {
// flushing. We don't want to block everything else while the
// flushing is performed.
let timelines_to_flush = {
let timelines = self.timelines.lock().unwrap();
timelines
.iter()
.map(|(id, timeline)| (*id, Arc::clone(timeline)))
.map(|(_id, timeline)| Arc::clone(timeline))
.collect::<Vec<_>>()
};
for (id, timeline) in &timelines_to_checkpoint {
timeline
.checkpoint(CheckpointConfig::Flush)
.instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id))
.await?;
for timeline in &timelines_to_flush {
timeline.freeze_and_flush().await?;
}
Ok(())
@@ -2095,8 +2092,13 @@ impl Tenant {
});
unfinished_timeline
.checkpoint(CheckpointConfig::Flush).await
.with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?;
.freeze_and_flush()
.await
.with_context(|| {
format!(
"Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
)
})?;
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
@@ -2831,7 +2833,7 @@ mod tests {
writer.finish_write(lsn);
lsn += 0x10;
}
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
{
let writer = tline.writer();
writer.put(
@@ -2848,7 +2850,7 @@ mod tests {
)?;
writer.finish_write(lsn);
}
tline.checkpoint(CheckpointConfig::Forced).await
tline.freeze_and_flush().await
}
#[tokio::test]
@@ -2863,7 +2865,7 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// FIXME: this doesn't actually remove any layer currently, given how the flushing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
tenant
@@ -3098,7 +3100,7 @@ mod tests {
writer.finish_write(Lsn(0x10));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
let writer = tline.writer();
@@ -3106,7 +3108,7 @@ mod tests {
writer.finish_write(Lsn(0x20));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
let writer = tline.writer();
@@ -3114,7 +3116,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
let writer = tline.writer();
@@ -3122,7 +3124,7 @@ mod tests {
writer.finish_write(Lsn(0x40));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
@@ -3135,8 +3137,8 @@ mod tests {
}
//
// Insert 1000 key-value pairs with increasing keys, checkpoint,
// repeat 50 times.
// Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
// Repeat 50 times.
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
@@ -3172,7 +3174,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
}
@@ -3240,11 +3242,10 @@ mod tests {
);
}
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
}
@@ -3323,11 +3324,10 @@ mod tests {
);
}
// Perform a cycle of checkpoint, compaction, and GC
println!("checkpointing {}", lsn);
// Perform a cycle of flush, compact, and GC
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.freeze_and_flush().await?;
tline.compact().await?;
tline.gc().await?;
}

View File

@@ -57,7 +57,6 @@ use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
@@ -499,22 +498,10 @@ impl Timeline {
}
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await?;
self.compact().await
}
}
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await
}
pub async fn compact(&self) -> anyhow::Result<()> {

View File

@@ -196,7 +196,7 @@ pub async fn shutdown_all_tenants() {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
if let Err(err) = tenant.checkpoint().await {
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}