mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 04:22:56 +00:00
pageserver: refined Timeline shutdown (#5833)
## Problem We have observed the shutdown of a timeline taking a long time when a deletion arrives at a busy time for the system. This suggests that we are not respecting cancellation tokens promptly enough. ## Summary of changes - Refactor timeline shutdown so that rather than having a shutdown() function that takes a flag for optionally flushing, there are two distinct functions, one for graceful flushing shutdown, and another that does the "normal" shutdown where we're just setting a cancellation token and then tearing down as fast as we can. This makes things a bit easier to reason about, and enables us to remove the hand-written variant of shutdown that was maintained in `delete.rs` - Layer flush task checks cancellation token more carefully - Logical size calculation's handling of cancellation tokens is simplified: rather than passing one in, it respects the Timeline's cancellation token. This PR doesn't touch RemoteTimelineClient, which will be a key thing to fix as well, so that a slow remote storage op doesn't hold up shutdown.
This commit is contained in:
@@ -125,6 +125,9 @@ where
|
||||
// Wake everyone with an error.
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
|
||||
// Block any future waiters from starting
|
||||
internal.shutdown = true;
|
||||
|
||||
// This will steal the entire waiters map.
|
||||
// When we drop it all waiters will be woken.
|
||||
mem::take(&mut internal.waiters)
|
||||
|
||||
@@ -303,11 +303,7 @@ async fn build_timeline_info(
|
||||
// we're executing this function, we will outlive the timeline on-disk state.
|
||||
info.current_logical_size_non_incremental = Some(
|
||||
timeline
|
||||
.get_current_logical_size_non_incremental(
|
||||
info.last_record_lsn,
|
||||
CancellationToken::new(),
|
||||
ctx,
|
||||
)
|
||||
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -512,7 +512,11 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
if timeline.cancel.is_cancelled() {
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
|
||||
@@ -21,7 +21,6 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
@@ -578,7 +577,6 @@ impl Timeline {
|
||||
pub async fn get_current_logical_size_non_incremental(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -590,7 +588,7 @@ impl Timeline {
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
|
||||
if cancel.is_cancelled() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
|
||||
@@ -1841,7 +1841,13 @@ impl Tenant {
|
||||
timelines.values().for_each(|timeline| {
|
||||
let timeline = Arc::clone(timeline);
|
||||
let span = Span::current();
|
||||
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
|
||||
js.spawn(async move {
|
||||
if freeze_and_flush {
|
||||
timeline.flush_and_shutdown().instrument(span).await
|
||||
} else {
|
||||
timeline.shutdown().instrument(span).await
|
||||
}
|
||||
});
|
||||
})
|
||||
};
|
||||
tracing::info!("Waiting for timelines...");
|
||||
@@ -4727,7 +4733,7 @@ mod tests {
|
||||
// Keeps uninit mark in place
|
||||
let raw_tline = tline.raw_timeline().unwrap();
|
||||
raw_tline
|
||||
.shutdown(false)
|
||||
.shutdown()
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
|
||||
.await;
|
||||
std::mem::forget(tline);
|
||||
|
||||
@@ -6,7 +6,6 @@ use std::sync::Arc;
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
@@ -350,10 +349,6 @@ async fn fill_logical_sizes(
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
// be sure to cancel all spawned tasks if we are dropped
|
||||
let _dg = cancel.clone().drop_guard();
|
||||
|
||||
// For each point that would benefit from having a logical size available,
|
||||
// spawn a Task to fetch it, unless we have it cached already.
|
||||
for seg in segments.iter() {
|
||||
@@ -371,15 +366,8 @@ async fn fill_logical_sizes(
|
||||
let parallel_size_calcs = Arc::clone(limit);
|
||||
let ctx = ctx.attached_child();
|
||||
joinset.spawn(
|
||||
calculate_logical_size(
|
||||
parallel_size_calcs,
|
||||
timeline,
|
||||
lsn,
|
||||
cause,
|
||||
ctx,
|
||||
cancel.child_token(),
|
||||
)
|
||||
.in_current_span(),
|
||||
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
|
||||
.in_current_span(),
|
||||
);
|
||||
}
|
||||
e.insert(cached_size);
|
||||
@@ -487,14 +475,13 @@ async fn calculate_logical_size(
|
||||
lsn: utils::lsn::Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
|
||||
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
|
||||
@@ -36,7 +36,6 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
@@ -50,6 +49,7 @@ use crate::tenant::{
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
@@ -247,7 +247,7 @@ pub struct Timeline {
|
||||
/// the flush finishes. You can use that to wait for the flush to finish.
|
||||
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
@@ -374,6 +374,19 @@ pub enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
@@ -891,15 +904,16 @@ impl Timeline {
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O.
|
||||
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
|
||||
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// prevent writes to the InMemoryLayer
|
||||
// Stop ingesting data, so that we are not still writing to an InMemoryLayer while
|
||||
// trying to flush
|
||||
tracing::debug!("Waiting for WalReceiverManager...");
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
@@ -908,40 +922,70 @@ impl Timeline {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// now all writers to InMemory layer are gone, do the final flush if requested
|
||||
if freeze_and_flush {
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
return; // TODO: should probably drain remote timeline client anyways?
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(_) => {
|
||||
// drain the upload queue
|
||||
if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
if let Err(e) = client.wait_completion().await {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to flush to remote storage: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// drain the upload queue
|
||||
let res = if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
client.wait_completion().await
|
||||
} else {
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
warn!("failed to await for frozen and flushed uploads: {e:#}");
|
||||
Err(e) => {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
self.shutdown().await;
|
||||
}
|
||||
|
||||
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
|
||||
/// the graceful [`Timeline::flush_and_shutdown`] function.
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
|
||||
// while doing so.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// Shut down the layer flush task before the remote client, as one depends on the other
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::LayerFlushTask),
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
|
||||
// case our caller wants to use that for a deletion
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
match remote_client.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
// Shutting down during initialization is legal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
@@ -985,7 +1029,12 @@ impl Timeline {
|
||||
reason,
|
||||
backtrace: backtrace_str,
|
||||
};
|
||||
self.set_state(broken_state)
|
||||
self.set_state(broken_state);
|
||||
|
||||
// Although the Broken state is not equivalent to shutdown() (shutdown will be called
|
||||
// later when this tenant is detach or the process shuts down), firing the cancellation token
|
||||
// here avoids the need for other tasks to watch for the Broken state explicitly.
|
||||
self.cancel.cancel();
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TimelineState {
|
||||
@@ -1741,12 +1790,8 @@ impl Timeline {
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
|
||||
// no extra cancellation here, because nothing really waits for this to complete compared
|
||||
// to spawn_ondemand_logical_size_calculation.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
@@ -1815,7 +1860,6 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
@@ -1836,7 +1880,7 @@ impl Timeline {
|
||||
false,
|
||||
async move {
|
||||
let res = self_clone
|
||||
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
|
||||
.logical_size_calculation_task(lsn, cause, &ctx)
|
||||
.await;
|
||||
let _ = sender.send(res).ok();
|
||||
Ok(()) // Receiver is responsible for handling errors
|
||||
@@ -1852,58 +1896,28 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let _guard = self.gate.enter();
|
||||
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
let mut calculation = pin!(async {
|
||||
let cancel = cancel.child_token();
|
||||
let ctx = ctx.attached_child();
|
||||
self_calculation
|
||||
.calculate_logical_size(lsn, cause, cancel, &ctx)
|
||||
.calculate_logical_size(lsn, cause, &ctx)
|
||||
.await
|
||||
});
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
match timeline_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = timeline_state_updates.borrow().clone();
|
||||
match new_state {
|
||||
// we're running this job for active timelines only
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken { .. }
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Loading => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
// can't happen, the sender is not dropped as long as the Timeline exists
|
||||
break "aborted because state watch was dropped".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let taskmgr_shutdown_cancellation = async {
|
||||
task_mgr::shutdown_watcher().await;
|
||||
"aborted because task_mgr shutdown requested".to_string()
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
res = &mut calculation => { res }
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
_ = self.cancel.cancelled() => {
|
||||
debug!("cancelling logical size calculation for timeline shutdown");
|
||||
calculation.await
|
||||
}
|
||||
reason = taskmgr_shutdown_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
debug!("cancelling logical size calculation for task shutdown");
|
||||
calculation.await
|
||||
}
|
||||
}
|
||||
@@ -1917,7 +1931,6 @@ impl Timeline {
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
@@ -1960,7 +1973,7 @@ impl Timeline {
|
||||
};
|
||||
let timer = storage_time_metrics.start_timer();
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
@@ -2373,6 +2386,10 @@ impl Timeline {
|
||||
info!("started flush loop");
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
},
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
@@ -2384,6 +2401,14 @@ impl Timeline {
|
||||
let timer = self.metrics.flush_time_histo.start_timer();
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
|
||||
// anyone waiting on that will respect self.cancel as well: they will stop
|
||||
// waiting at the same time we as drop out of this loop.
|
||||
return;
|
||||
}
|
||||
|
||||
let layer_to_flush = {
|
||||
let guard = self.layers.read().await;
|
||||
guard.layer_map().frozen_layers.front().cloned()
|
||||
@@ -2392,9 +2417,18 @@ impl Timeline {
|
||||
let Some(layer_to_flush) = layer_to_flush else {
|
||||
break Ok(());
|
||||
};
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
match self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(FlushLayerError::Cancelled) => {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
|
||||
) => {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break err;
|
||||
}
|
||||
}
|
||||
};
|
||||
// Notify any listeners that we're done
|
||||
@@ -2443,7 +2477,17 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
trace!("waiting for flush to complete");
|
||||
rx.changed().await?;
|
||||
tokio::select! {
|
||||
rx_e = rx.changed() => {
|
||||
rx_e?;
|
||||
},
|
||||
// Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
|
||||
// the notification from [`flush_loop`] that it completed.
|
||||
_ = self.cancel.cancelled() => {
|
||||
tracing::info!("Cancelled layer flush due on timeline shutdown");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
trace!("done")
|
||||
}
|
||||
}
|
||||
@@ -2458,7 +2502,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), FlushLayerError> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -2483,6 +2527,11 @@ impl Timeline {
|
||||
let (partitioning, _lsn) = self
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
(
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
@@ -2514,6 +2563,10 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
@@ -2523,6 +2576,10 @@ impl Timeline {
|
||||
let metadata = {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
|
||||
@@ -326,8 +326,7 @@ impl Timeline {
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
|
||||
.await;
|
||||
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
@@ -367,21 +366,12 @@ impl Timeline {
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_timeline_cached_layer_accesses(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||
let lsn = self.get_last_record_lsn();
|
||||
|
||||
// imitiate on-restart initial logical size
|
||||
let size = self
|
||||
.calculate_logical_size(
|
||||
lsn,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user