fix: remote timeline client shutdown trips circuit breaker (#8495)

Before this PR

1.The circuit breaker would trip on CompactionError::Shutdown. That's
wrong, we want to ignore those cases.
2. remote timeline client shutdown would not be mapped to
CompactionError::Shutdown in all circumstances.

We observed this in staging, see
https://neondb.slack.com/archives/C033RQ5SPDH/p1721829745384449

This PR fixes (1) with a simple `match` statement, and (2) by switching
a bunch of `anyhow` usage over to distinguished errors that ultimately
get mapped to `CompactionError::Shutdown`.

I removed the implicit `#[from]` conversion from `anyhow::Error` to
`CompactionError::Other` to discover all the places that were mapping
remote timeline client shutdown to `anyhow::Error`.

In my opinion `#[from]` is an antipattern and we should avoid it,
especially for `anyhow::Error`. If some callee is going to return
anyhow, the very least the caller should to is to acknowledge, through a
`map_err(MyError::Other)` that they're conflating different failure
reasons.
This commit is contained in:
Christian Schwarz
2024-07-25 10:44:31 +02:00
committed by GitHub
parent d57412aaab
commit a1256b2a67
9 changed files with 158 additions and 58 deletions

View File

@@ -1650,7 +1650,9 @@ async fn timeline_compact_handler(
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?;
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, ())
}
@@ -1709,7 +1711,9 @@ async fn timeline_checkpoint_handler(
}
if wait_until_uploaded {
timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?;
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, ())

View File

@@ -1620,7 +1620,7 @@ impl Tenant {
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<(), timeline::CompactionError> {
) -> Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(());
@@ -1665,12 +1665,14 @@ impl Tenant {
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.map_err(|e| {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, &e);
e
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
}
@@ -4568,7 +4570,7 @@ mod tests {
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()?
.get_level0_deltas()
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.collect::<Vec<_>>();
@@ -5787,7 +5789,7 @@ mod tests {
.read()
.await
.layer_map()
.get_level0_deltas()?
.get_level0_deltas()
.len();
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
@@ -5797,7 +5799,7 @@ mod tests {
.read()
.await
.layer_map()
.get_level0_deltas()?
.get_level0_deltas()
.len();
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");

View File

@@ -845,8 +845,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
}
/// debugging function to print out the contents of the layer map

View File

@@ -287,6 +287,14 @@ pub enum PersistIndexPartWithDeletedFlagError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum WaitCompletionError {
#[error(transparent)]
NotInitialized(NotInitialized),
#[error("wait_completion aborted because upload queue was stopped")]
UploadQueueShutDownOrStopped,
}
/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
@@ -630,7 +638,7 @@ impl RemoteTimelineClient {
///
/// Like schedule_index_upload_for_metadata_update(), this merely adds
/// the upload to the upload queue and returns quickly.
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> Result<(), NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -645,7 +653,7 @@ impl RemoteTimelineClient {
fn schedule_index_upload(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> anyhow::Result<()> {
) -> Result<(), NotInitialized> {
let disk_consistent_lsn = upload_queue.dirty.metadata.disk_consistent_lsn();
// fix up the duplicated field
upload_queue.dirty.disk_consistent_lsn = disk_consistent_lsn;
@@ -653,7 +661,7 @@ impl RemoteTimelineClient {
// make sure it serializes before doing it in perform_upload_task so that it doesn't
// look like a retryable error
let void = std::io::sink();
serde_json::to_writer(void, &upload_queue.dirty).context("serialize index_part.json")?;
serde_json::to_writer(void, &upload_queue.dirty).expect("serialize index_part.json");
let index_part = &upload_queue.dirty;
@@ -699,7 +707,9 @@ impl RemoteTimelineClient {
self.schedule_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
Self::wait_completion0(receiver)
.await
.context("wait completion")
}
/// Schedules uploading a new version of `index_part.json` with the given layers added,
@@ -732,7 +742,9 @@ impl RemoteTimelineClient {
barrier
};
Self::wait_completion0(barrier).await
Self::wait_completion0(barrier)
.await
.context("wait completion")
}
/// Launch an upload operation in the background; the file is added to be included in next
@@ -740,7 +752,7 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
) -> anyhow::Result<()> {
) -> Result<(), NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -826,7 +838,7 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
names: I,
) -> anyhow::Result<Vec<(LayerName, LayerFileMetadata)>>
) -> Result<Vec<(LayerName, LayerFileMetadata)>, NotInitialized>
where
I: IntoIterator<Item = LayerName>,
{
@@ -952,7 +964,7 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
) -> anyhow::Result<()> {
) -> Result<(), NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -969,10 +981,12 @@ impl RemoteTimelineClient {
}
/// Wait for all previously scheduled uploads/deletions to complete
pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
pub(crate) async fn wait_completion(self: &Arc<Self>) -> Result<(), WaitCompletionError> {
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let upload_queue = guard
.initialized_mut()
.map_err(WaitCompletionError::NotInitialized)?;
self.schedule_barrier0(upload_queue)
};
@@ -981,9 +995,9 @@ impl RemoteTimelineClient {
async fn wait_completion0(
mut receiver: tokio::sync::watch::Receiver<()>,
) -> anyhow::Result<()> {
) -> Result<(), WaitCompletionError> {
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
return Err(WaitCompletionError::UploadQueueShutDownOrStopped);
}
Ok(())

View File

@@ -17,7 +17,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::repository::Key;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::task_mgr::TaskKind;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::{CompactionError, GetVectoredError};
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
use super::delta_layer::{self, DeltaEntry};
@@ -426,7 +426,7 @@ impl Layer {
}
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
let downloaded = self.0.get_or_maybe_download(true, None).await?;
Ok(ResidentLayer {
@@ -1862,12 +1862,24 @@ impl ResidentLayer {
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
) -> Result<usize, CompactionError> {
use LayerKind::*;
match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
Image(i) => i.filter(shard_identity, writer, ctx).await,
match self
.downloaded
.get(&self.owner.0, ctx)
.await
.map_err(CompactionError::Other)?
{
Delta(_) => {
return Err(CompactionError::Other(anyhow::anyhow!(format!(
"cannot filter() on a delta layer {self}"
))));
}
Image(i) => i
.filter(shard_identity, writer, ctx)
.await
.map_err(CompactionError::Other),
}
}

View File

@@ -4786,7 +4786,7 @@ pub(crate) enum CompactionError {
ShuttingDown,
/// Compaction cannot be done right now; page reconstruction and so on.
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
}
impl From<CollectKeySpaceError> for CompactionError {
@@ -4801,6 +4801,38 @@ impl From<CollectKeySpaceError> for CompactionError {
}
}
impl From<super::upload_queue::NotInitialized> for CompactionError {
fn from(value: super::upload_queue::NotInitialized) -> Self {
match value {
super::upload_queue::NotInitialized::Uninitialized
| super::upload_queue::NotInitialized::Stopped => {
CompactionError::Other(anyhow::anyhow!(value))
}
super::upload_queue::NotInitialized::ShuttingDown => CompactionError::ShuttingDown,
}
}
}
impl CompactionError {
/// We cannot do compaction because we could not download a layer that is input to the compaction.
pub(crate) fn input_layer_download_failed(
e: super::storage_layer::layer::DownloadError,
) -> Self {
match e {
super::storage_layer::layer::DownloadError::TimelineShutdown |
/* TODO DownloadCancelled correct here? */
super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown,
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads |
super::storage_layer::layer::DownloadError::DownloadRequired |
super::storage_layer::layer::DownloadError::NotFile(_) |
super::storage_layer::layer::DownloadError::DownloadFailed |
super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)),
#[cfg(test)]
super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)),
}
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
@@ -4874,7 +4906,7 @@ impl Timeline {
new_deltas: &[ResidentLayer],
new_images: &[ResidentLayer],
layers_to_remove: &[Layer],
) -> anyhow::Result<()> {
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
let mut duplicated_layers = HashSet::new();
@@ -4892,7 +4924,7 @@ impl Timeline {
// because we have not implemented L0 => L0 compaction.
duplicated_layers.insert(l.layer_desc().key());
} else if LayerMap::is_l0(&l.layer_desc().key_range) {
bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
return Err(CompactionError::Other(anyhow::anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
} else {
insert_layers.push(l.clone());
}
@@ -4924,7 +4956,7 @@ impl Timeline {
self: &Arc<Self>,
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> anyhow::Result<()> {
) -> Result<(), super::upload_queue::NotInitialized> {
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
@@ -4946,7 +4978,7 @@ impl Timeline {
fn upload_new_image_layers(
self: &Arc<Self>,
new_images: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<()> {
) -> Result<(), super::upload_queue::NotInitialized> {
for layer in new_images {
self.remote_client.schedule_layer_file_upload(layer)?;
}

View File

@@ -27,6 +27,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState};
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
@@ -108,7 +109,10 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<(), CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
return self.compact_with_gc(cancel, ctx).await;
return self
.compact_with_gc(cancel, ctx)
.await
.map_err(CompactionError::Other);
}
// High level strategy for compaction / image creation:
@@ -236,7 +240,7 @@ impl Timeline {
self: &Arc<Self>,
rewrite_max: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> Result<(), CompactionError> {
let mut drop_layers = Vec::new();
let mut layers_to_rewrite: Vec<Layer> = Vec::new();
@@ -357,7 +361,8 @@ impl Timeline {
layer.layer_desc().image_layer_lsn(),
ctx,
)
.await?;
.await
.map_err(CompactionError::Other)?;
// Safety of layer rewrites:
// - We are writing to a different local file path than we are reading from, so the old Layer
@@ -372,14 +377,20 @@ impl Timeline {
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
// - ingestion, which only inserts layers, therefore cannot collide with us.
let resident = layer.download_and_keep_resident().await?;
let resident = layer
.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?;
let keys_written = resident
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
.await?;
if keys_written > 0 {
let new_layer = image_layer_writer.finish(self, ctx).await?;
let new_layer = image_layer_writer
.finish(self, ctx)
.await
.map_err(CompactionError::Other)?;
tracing::info!(layer=%new_layer, "Rewrote layer, {} -> {} bytes",
layer.metadata().file_size,
new_layer.metadata().file_size);
@@ -407,7 +418,13 @@ impl Timeline {
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
self.remote_client.wait_completion().await?;
match self.remote_client.wait_completion().await {
Ok(()) => (),
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
return Err(CompactionError::ShuttingDown)
}
}
fail::fail_point!("compact-shard-ancestors-persistent");
@@ -465,7 +482,7 @@ impl Timeline {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map();
let level0_deltas = layers.get_level0_deltas()?;
let level0_deltas = layers.get_level0_deltas();
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| guard.get_from_desc(&x))
@@ -518,14 +535,23 @@ impl Timeline {
) as u64
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
deltas_to_compact.push(
first_level0_delta
.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?,
);
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(l.download_and_keep_resident().await?);
deltas_to_compact.push(
l.download_and_keep_resident()
.await
.map_err(CompactionError::input_layer_download_failed)?,
);
deltas_to_compact_bytes += l.metadata().file_size;
prev_lsn_end = lsn_range.end;
@@ -584,7 +610,7 @@ impl Timeline {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await?);
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// FIXME: should spawn_blocking the rest of this function
@@ -706,7 +732,7 @@ impl Timeline {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load(ctx).await?;
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -763,7 +789,8 @@ impl Timeline {
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self, ctx)
.await?,
.await
.map_err(CompactionError::Other)?,
);
writer = None;
@@ -801,7 +828,8 @@ impl Timeline {
},
ctx,
)
.await?,
.await
.map_err(CompactionError::Other)?,
);
}
@@ -809,7 +837,8 @@ impl Timeline {
.as_mut()
.unwrap()
.put_value(key, lsn, value, ctx)
.await?;
.await
.map_err(CompactionError::Other)?;
} else {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
@@ -825,7 +854,12 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next(), self, ctx).await?);
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
}
// Sync layers
@@ -1007,7 +1041,7 @@ impl Timeline {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let l0_deltas = layers.get_level0_deltas()?;
let l0_deltas = layers.get_level0_deltas();
drop(guard);
// As an optimization, if we find that there are too few L0 layers,
@@ -1037,7 +1071,9 @@ impl Timeline {
fanout,
ctx,
)
.await?;
.await
// TODO: compact_tiered needs to return CompactionError
.map_err(CompactionError::Other)?;
adaptor.flush_updates().await?;
Ok(())
@@ -1235,7 +1271,7 @@ impl Timeline {
self: &Arc<Self>,
_cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
) -> anyhow::Result<()> {
use std::collections::BTreeSet;
info!("running enhanced gc bottom-most compaction");
@@ -1504,7 +1540,7 @@ impl TimelineAdaptor {
}
}
pub async fn flush_updates(&mut self) -> anyhow::Result<()> {
pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
let layers_to_delete = {
let guard = self.timeline.layers.read().await;
self.layers_to_delete

View File

@@ -26,7 +26,7 @@ pub(crate) enum Error {
#[error("flushing failed")]
FlushAncestor(#[source] FlushLayerError),
#[error("layer download failed")]
RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
RewrittenDeltaDownloadFailed(#[source] crate::tenant::storage_layer::layer::DownloadError),
#[error("copying LSN prefix locally failed")]
CopyDeltaPrefix(#[source] anyhow::Error),
#[error("upload rewritten layer")]

View File

@@ -130,7 +130,7 @@ pub(super) enum UploadQueueStopped {
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
pub enum NotInitialized {
#[error("queue is in state Uninitialized")]
Uninitialized,
#[error("queue is in state Stopped")]