Refactor layer flushing task

Extracted from https://github.com/neondatabase/neon/pull/2595
This commit is contained in:
Heikki Linnakangas
2022-10-15 02:08:18 +03:00
committed by Dmitry Rodionov
parent bc40a5595f
commit 7b7f84f1b4
7 changed files with 317 additions and 225 deletions

View File

@@ -825,14 +825,14 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
.await
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
.map_err(ApiError::InternalServerError)?;
@@ -868,6 +868,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
.map_err(ApiError::NotFound)?;
timeline
.checkpoint(CheckpointConfig::Forced)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())

View File

@@ -25,6 +25,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
@@ -367,14 +368,12 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
// import_basebackup_from_tar() is not async, mainly because the Tar crate
// it uses is not async. So we need to jump through some hoops:
// - convert the input from client connection to a synchronous Read
// - use block_in_place()
let mut copyin_stream = Box::pin(copyin_stream(pgb));
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| timeline.import_basebackup_from_tar(reader, base_lsn))?;
timeline.initialize()?;
let copyin_stream = copyin_stream(pgb);
pin!(copyin_stream);
timeline
.import_basebackup_from_tar(&mut copyin_stream, base_lsn)
.await?;
// Drain the rest of the Copy data
let mut bytes_after_tar = 0;
@@ -439,7 +438,7 @@ impl PageServerHandler {
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.checkpoint(CheckpointConfig::Flush)?;
timeline.checkpoint(CheckpointConfig::Flush).await?;
info!("done");
Ok(())

View File

@@ -12,8 +12,12 @@
//!
use anyhow::{bail, Context};
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TimelineState;
use tokio::sync::watch;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -29,6 +33,7 @@ use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
@@ -137,7 +142,7 @@ pub struct Tenant {
pub struct UninitializedTimeline<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Timeline, TimelineUninitMark)>,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
@@ -169,7 +174,6 @@ impl UninitializedTimeline<'_> {
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
let new_timeline = Arc::new(new_timeline);
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
// TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least
@@ -197,6 +201,9 @@ impl UninitializedTimeline<'_> {
})?;
new_timeline.set_state(TimelineState::Active);
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
new_timeline.launch_wal_receiver();
}
}
@@ -205,20 +212,28 @@ impl UninitializedTimeline<'_> {
}
/// Prepares timeline data by loading it from the basebackup archive.
pub fn import_basebackup_from_tar(
&self,
reader: impl std::io::Read,
pub async fn import_basebackup_from_tar(
self,
mut copyin_stream: &mut Pin<&mut impl Stream<Item = io::Result<Bytes>>>,
base_lsn: Lsn,
) -> anyhow::Result<()> {
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn).with_context(
|| {
format!(
"Failed to import basebackup for timeline {}/{}",
self.owning_tenant.tenant_id, self.timeline_id
)
},
)?;
// import_basebackup_from_tar() is not async, mainly because the Tar crate
// it uses is not async. So we need to jump through some hoops:
// - convert the input from client connection to a synchronous Read
// - use block_in_place()
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| {
import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn)
.context("Failed to import basebackup")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
raw_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
@@ -226,16 +241,15 @@ impl UninitializedTimeline<'_> {
raw_timeline
.checkpoint(CheckpointConfig::Flush)
.with_context(|| {
format!(
"Failed to checkpoint after basebackup import for timeline {}/{}",
self.owning_tenant.tenant_id, self.timeline_id
)
})?;
Ok(())
.await
.context("Failed to checkpoint after basebackup import")?;
let timeline = self.initialize()?;
Ok(timeline)
}
fn raw_timeline(&self) -> anyhow::Result<&Timeline> {
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
Ok(&self
.raw_timeline
.as_ref()
@@ -470,7 +484,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
None => self.bootstrap_timeline(new_timeline_id, pg_version).await?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
@@ -488,7 +502,7 @@ impl Tenant {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
pub fn gc_iteration(
pub async fn gc_iteration(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
@@ -504,11 +518,13 @@ impl Tenant {
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
})
{
let _timer = STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.start_timer();
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
.await
}
}
/// Perform one compaction iteration.
@@ -544,23 +560,24 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub fn checkpoint(&self) -> anyhow::Result<()> {
pub async fn checkpoint(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline)))
.collect::<Vec<_>>();
drop(timelines);
let timelines_to_checkpoint = {
let timelines = self.timelines.lock().unwrap();
timelines
.iter()
.map(|(id, timeline)| (*id, Arc::clone(timeline)))
.collect::<Vec<_>>()
};
for (timeline_id, timeline) in &timelines_to_checkpoint {
let _entered =
info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id)
.entered();
timeline.checkpoint(CheckpointConfig::Flush)?;
for (id, timeline) in &timelines_to_checkpoint {
timeline
.checkpoint(CheckpointConfig::Flush)
.instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id))
.await?;
}
Ok(())
@@ -974,7 +991,7 @@ impl Tenant {
// - if a relation has a non-incremental persistent layer on a child branch, then we
// don't need to keep that in the parent anymore. But currently
// we do.
fn gc_iteration_internal(
async fn gc_iteration_internal(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
@@ -1007,7 +1024,7 @@ impl Tenant {
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced)?;
timeline.checkpoint(CheckpointConfig::Forced).await?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
@@ -1117,7 +1134,6 @@ impl Tenant {
}
}
drop(gc_cs);
Ok(gc_timelines)
}
@@ -1222,14 +1238,15 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
fn bootstrap_timeline(
async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> anyhow::Result<Arc<Timeline>> {
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(timeline_id, &timelines)?;
drop(timelines);
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(timeline_id, &timelines)?
};
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
@@ -1279,25 +1296,35 @@ impl Tenant {
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
tokio::task::block_in_place(|| {
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
})
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
unfinished_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
unfinished_timeline
.checkpoint(CheckpointConfig::Forced)
.checkpoint(CheckpointConfig::Forced).await
.with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?;
let mut timelines = self.timelines.lock().unwrap();
let timeline = raw_timeline.initialize_with_lock(&mut timelines, false)?;
drop(timelines);
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(&mut timelines, false)?
};
info!(
"created root timeline {} timeline.lsn {}",
@@ -1337,7 +1364,7 @@ impl Tenant {
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((new_timeline, uninit_mark)),
raw_timeline: Some((Arc::new(new_timeline), uninit_mark)),
})
}
Err(e) => {
@@ -1456,7 +1483,7 @@ impl Tenant {
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())),
};
match timeline.initialize_with_lock(&mut timelines_accessor, true) {
Ok(initialized_timeline) => {
@@ -1910,7 +1937,7 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1931,7 +1958,7 @@ mod tests {
writer.finish_write(lsn);
lsn += 0x10;
}
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
{
let writer = tline.writer();
writer.put(
@@ -1948,24 +1975,26 @@ mod tests {
)?;
writer.finish_write(lsn);
}
tline.checkpoint(CheckpointConfig::Forced)
tline.checkpoint(CheckpointConfig::Forced).await
}
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
@@ -2010,14 +2039,14 @@ mod tests {
/*
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
#[test]
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
#[tokio::test]
async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
@@ -2030,43 +2059,47 @@ mod tests {
}
*/
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
}
#[test]
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
// run gc on parent
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
// Check that the data is still accessible on the branch.
assert_eq!(
@@ -2077,8 +2110,8 @@ mod tests {
Ok(())
}
#[test]
fn timeline_load() -> anyhow::Result<()> {
#[tokio::test]
async fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -2086,8 +2119,8 @@ mod tests {
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
}
let tenant = harness.load();
@@ -2098,8 +2131,8 @@ mod tests {
Ok(())
}
#[test]
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
#[tokio::test]
async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2109,8 +2142,8 @@ mod tests {
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -2118,8 +2151,8 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
tline.checkpoint(CheckpointConfig::Forced)?;
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
}
// check that both of them are initially unloaded
@@ -2179,8 +2212,8 @@ mod tests {
Ok(())
}
#[test]
fn test_images() -> anyhow::Result<()> {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2191,7 +2224,7 @@ mod tests {
writer.finish_write(Lsn(0x10));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2199,7 +2232,7 @@ mod tests {
writer.finish_write(Lsn(0x20));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2207,7 +2240,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
let writer = tline.writer();
@@ -2215,7 +2248,7 @@ mod tests {
writer.finish_write(Lsn(0x40));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
@@ -2231,8 +2264,8 @@ mod tests {
// Insert 1000 key-value pairs with increasing keys, checkpoint,
// repeat 50 times.
//
#[test]
fn test_bulk_insert() -> anyhow::Result<()> {
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2265,7 +2298,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}
@@ -2273,8 +2306,8 @@ mod tests {
Ok(())
}
#[test]
fn test_random_updates() -> anyhow::Result<()> {
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2337,7 +2370,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}
@@ -2345,8 +2378,8 @@ mod tests {
Ok(())
}
#[test]
fn test_traverse_branches() -> anyhow::Result<()> {
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2418,7 +2451,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.compact()?;
tline.gc()?;
}

View File

@@ -16,7 +16,7 @@ use std::fs;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant, SystemTime};
use crate::tenant::{
@@ -121,8 +121,16 @@ pub struct Timeline {
/// to avoid deadlock.
write_lock: Mutex<()>,
/// Used to ensure that there is only task performing flushing at a time
layer_flush_lock: Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_started: Mutex<bool>,
/// layer_flush_start_tx can be used to wake up the layer-flushing task.
/// The value is a counter, incremented every time a new flush cycle is requested.
/// The flush cycle counter is sent back on the layer_flush_done channel when
/// the flush finishes. You can use that to wait for the flush to finish.
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
@@ -466,15 +474,16 @@ impl Timeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)
self.flush_frozen_layers_and_wait().await
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers(true)?;
self.flush_frozen_layers_and_wait().await?;
self.compact()
}
}
@@ -591,62 +600,6 @@ impl Timeline {
Ok(size)
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
@@ -732,6 +685,9 @@ impl Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let mut result = Timeline {
conf,
tenant_conf,
@@ -759,8 +715,12 @@ impl Timeline {
upload_layers: AtomicBool::new(upload_layers),
flush_loop_started: Mutex::new(false),
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
layer_flush_lock: Mutex::new(()),
layer_removal_cs: Mutex::new(()),
gc_info: RwLock::new(GcInfo {
@@ -793,6 +753,33 @@ impl Timeline {
result
}
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
let mut flush_loop_started = self.flush_loop_started.lock().unwrap();
if *flush_loop_started {
info!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
return;
}
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
let self_clone = Arc::clone(self);
info!("spawning flush loop");
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) }
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
*flush_loop_started = true;
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_etcd_client_initialized() {
if cfg!(test) {
@@ -1289,53 +1276,128 @@ impl Timeline {
drop(layers);
}
/// Flush all frozen layers to disk.
///
/// Only one task at a time can be doing layer-flushing for a
/// given timeline. If 'wait' is true, and another task is
/// currently doing the flushing, this function will wait for it
/// to finish. If 'wait' is false, this function will return
/// immediately instead.
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
let flush_lock_guard = if wait {
self.layer_flush_lock.lock().unwrap()
} else {
match self.layer_flush_lock.try_lock() {
Ok(guard) => guard,
Err(TryLockError::WouldBlock) => return Ok(()),
Err(TryLockError::Poisoned(err)) => panic!("{:?}", err),
}
};
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
/// the in-memory layer, and initiate flushing it if so.
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
///
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
let distance = last_lsn.widening_sub(last_freeze_at);
// Checkpointing the open layer can be triggered by layer size or LSN range.
// S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
// we want to stay below that with a big margin. The LSN distance determines how
// much WAL the safekeepers need to store.
if distance >= self.get_checkpoint_distance().into()
|| open_layer_size > self.get_checkpoint_distance()
|| (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
{
info!(
"check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
distance,
open_layer_size,
last_freeze_ts.elapsed()
);
let timer = self.metrics.flush_time_histo.start_timer();
self.freeze_inmem_layer(true);
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
loop {
let layers = self.layers.read().unwrap();
if let Some(frozen_layer) = layers.frozen_layers.front() {
let frozen_layer = Arc::clone(frozen_layer);
drop(layers); // to allow concurrent reads and writes
self.flush_frozen_layer(frozen_layer)?;
} else {
// Drop the 'layer_flush_lock' *before* 'layers'. That
// way, if you freeze a layer, and then call
// flush_frozen_layers(false), it is guaranteed that
// if another thread was busy flushing layers and the
// call therefore returns immediately, the other
// thread will have seen the newly-frozen layer and
// will flush that too (assuming no errors).
drop(flush_lock_guard);
drop(layers);
break;
// Wake up the layer flusher
self.flush_frozen_layers();
}
}
timer.stop_and_record();
Ok(())
}
/// Layer flusher task's main loop.
async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>) {
info!("started flush loop");
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
break;
},
_ = layer_flush_start_rx.changed() => {}
}
trace!("waking up");
let timer = self.metrics.flush_time_histo.start_timer();
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
if let Some(layer_to_flush) = layer_to_flush {
if let Err(err) = self.flush_frozen_layer(layer_to_flush).await {
error!("could not flush frozen layer: {err:?}");
break Err(err);
}
continue;
} else {
break Ok(());
}
};
// Notify any listeners that we're done
let _ = self
.layer_flush_done_tx
.send_replace((flush_counter, result));
timer.stop_and_record();
}
}
async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
let mut rx = self.layer_flush_done_tx.subscribe();
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
self.layer_flush_start_tx.send_modify(|counter| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
});
loop {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= my_flush_request {
if let Err(_err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
// here, because it might not be Cloneable
bail!("could not flush frozen layer");
} else {
return Ok(());
}
}
}
trace!("waiting for flush to complete");
rx.changed().await?;
trace!("done")
}
}
fn flush_frozen_layers(&self) {
self.layer_flush_start_tx.send_modify(|val| *val += 1);
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
#[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.filename().display()))]
async fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -2265,13 +2327,10 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let img = self.walredo_mgr.request_redo(
key,
request_lsn,
base_img,
data.records,
self.pg_version,
)?;
let img = self
.walredo_mgr
.request_redo(key, request_lsn, base_img, data.records, self.pg_version)
.context("Failed to reconstruct a page image:")?;
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();

View File

@@ -241,7 +241,7 @@ pub async fn shutdown_all_tenants() {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
if let Err(err) = tenant.checkpoint() {
if let Err(err) = tenant.checkpoint().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}

View File

@@ -119,7 +119,7 @@ async fn gc_loop(tenant_id: TenantId) {
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration);

View File

@@ -120,7 +120,7 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
#[error("encountered io error: {0}")]
IoError(#[from] std::io::Error),
#[error("cannot perform WAL redo now")]