Compare commits

...

5 Commits

Author SHA1 Message Date
Christian Schwarz
cabf452fa7 avoid traversing the anyhow Cause chain
Observation: there was only a thin layer of anyhow between the
types for which log_compaction_error chased down the cause chain
and the types and the conversion to CompactionError::Other.

So, remove the implicit #[from] conversion generated by thiserror,
and de-`anyhow`ify / explicitly opt-into-`::Other` all the places
that used it previously.
2024-01-23 20:08:31 +01:00
Joonas Koivunen
7c9f4c270e driveby: align error stringifications 2024-01-23 09:54:42 +00:00
Joonas Koivunen
2404106586 fix: log some errors at info, error or disregard 2024-01-23 09:54:42 +00:00
Joonas Koivunen
b45c1b5965 feat: add similar is_stopping to PageReconstructError 2024-01-23 09:54:42 +00:00
Joonas Koivunen
82e97e0c59 feat: add new root cause for RTC stopping 2024-01-23 09:54:42 +00:00
7 changed files with 178 additions and 37 deletions

View File

@@ -1881,7 +1881,7 @@ impl Tenant {
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<(), timeline::CompactionError> {
) -> Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(());

View File

@@ -619,8 +619,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
}
/// debugging function to print out the contents of the layer map

View File

@@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
use super::upload_queue::SetDeletedFlagProgress;
use super::upload_queue::{self, SetDeletedFlagProgress};
use super::Generation;
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
@@ -621,7 +621,9 @@ impl RemoteTimelineClient {
///
/// Like schedule_index_upload_for_metadata_update(), this merely adds
/// the upload to the upload queue and returns quickly.
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
pub(crate) fn schedule_index_upload_for_file_changes(
self: &Arc<Self>,
) -> Result<(), upload_queue::NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -666,7 +668,7 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
) -> anyhow::Result<()> {
) -> Result<(), upload_queue::NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -875,7 +877,7 @@ impl RemoteTimelineClient {
self: &Arc<Self>,
compacted_from: &[Layer],
compacted_to: &[ResidentLayer],
) -> anyhow::Result<()> {
) -> Result<(), upload_queue::NotInitialized> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;

View File

@@ -290,7 +290,7 @@ impl Layer {
}
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
let downloaded = self.0.get_or_maybe_download(true, None).await?;
Ok(ResidentLayer {
@@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError {
/// Error internal to the [`LayerInner::get_or_maybe_download`]
#[derive(Debug, thiserror::Error)]
enum DownloadError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
@@ -1197,6 +1197,15 @@ enum DownloadError {
PostStatFailed(#[source] std::io::Error),
}
impl DownloadError {
pub fn is_cancelled(&self) -> bool {
match self {
Self::TimelineShutdown | Self::DownloadCancelled => true,
_ => false,
}
}
}
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,

View File

@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
wait_duration
} else {
@@ -210,6 +214,38 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
enum LooksLike {
Info,
Error,
}
let decision = match e {
ShuttingDown => None,
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => Some(LooksLike::Error),
};
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
}
}
///
/// GC task's main loop
///

View File

@@ -103,11 +103,14 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
storage_layer::layer,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -391,8 +394,7 @@ pub(crate) enum PageReconstructError {
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
/// The operation was cancelled
#[error("Cancelled")]
#[error("timeline shutting down")]
Cancelled,
/// The ancestor of this is being stopped
@@ -404,6 +406,19 @@ pub(crate) enum PageReconstructError {
WalRedo(anyhow::Error),
}
impl PageReconstructError {
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Other(_) => false,
AncestorLsnTimeout(_) => false,
Cancelled | AncestorStopping(_) => true,
WalRedo(_) => false,
}
}
}
#[derive(thiserror::Error, Debug)]
enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -832,8 +847,7 @@ impl Timeline {
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
.await
.map_err(anyhow::Error::from)?;
.await?;
if let Some(remote_client) = &self.remote_client {
for layer in layers {
remote_client.schedule_layer_file_upload(layer)?;
@@ -3200,7 +3214,46 @@ pub(crate) enum CompactionError {
ShuttingDown,
/// Compaction cannot be done right now; page reconstruction and so on.
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
}
impl CompactionError {
fn other<E>(err: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
CompactionError::Other(anyhow::Error::new(err))
}
}
impl From<PageReconstructError> for CompactionError {
fn from(value: PageReconstructError) -> Self {
if value.is_stopping() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
}
impl From<NotInitialized> for CompactionError {
fn from(value: NotInitialized) -> Self {
if value.is_stopping() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
}
impl From<layer::DownloadError> for CompactionError {
fn from(value: layer::DownloadError) -> Self {
if value.is_cancelled() {
CompactionError::ShuttingDown
} else {
CompactionError::other(value)
}
}
}
#[serde_as]
@@ -3333,7 +3386,7 @@ impl Timeline {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map();
let level0_deltas = layers.get_level0_deltas()?;
let level0_deltas = layers.get_level0_deltas();
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| guard.get_from_desc(&x))
@@ -3380,7 +3433,8 @@ impl Timeline {
delta
.download_and_keep_resident()
.await
.context("download layer for failpoint")?,
.context("download layer for failpoint")
.map_err(CompactionError::Other)?,
);
}
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
@@ -3464,7 +3518,7 @@ impl Timeline {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
all_keys.extend(l.load_keys(ctx).await?);
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// FIXME: should spawn_blocking the rest of this function
@@ -3484,7 +3538,10 @@ impl Timeline {
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
let coverage_size = layers
.image_coverage(&key_range, last_record_lsn)
.map_err(CompactionError::Other)?
.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
@@ -3583,7 +3640,7 @@ impl Timeline {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load(ctx).await?;
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -3640,7 +3697,8 @@ impl Timeline {
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self)
.await?,
.await
.map_err(CompactionError::Other)?,
);
writer = None;
@@ -3670,7 +3728,8 @@ impl Timeline {
lsn_range.clone()
},
)
.await?,
.await
.map_err(CompactionError::Other)?,
);
}
@@ -3681,7 +3740,12 @@ impl Timeline {
});
if !self.shard_identity.is_key_disposable(&key) {
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
writer
.as_mut()
.unwrap()
.put_value(key, lsn, value)
.await
.map_err(CompactionError::Other)?;
} else {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
@@ -3697,7 +3761,12 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self)
.await
.map_err(CompactionError::Other)?,
);
}
// Sync layers
@@ -3726,7 +3795,8 @@ impl Timeline {
// minimize latency.
par_fsync::par_fsync_async(&layer_paths)
.await
.context("fsync all new layers")?;
.context("fsync all new layers")
.map_err(CompactionError::Other)?;
let timeline_dir = self
.conf
@@ -3734,7 +3804,8 @@ impl Timeline {
par_fsync::par_fsync_async(&[timeline_dir])
.await
.context("fsync of timeline dir")?;
.context("fsync of timeline dir")
.map_err(CompactionError::Other)?;
}
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();

View File

@@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped {
pub(super) deleted_at: SetDeletedFlagProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
Uninitialized,
#[error("queue is in state Stopping")]
Stopped,
#[error("queue is shutting down")]
ShuttingDown,
}
impl NotInitialized {
pub(crate) fn is_stopping(&self) -> bool {
use NotInitialized::*;
match self {
Uninitialized => false,
Stopped => true,
ShuttingDown => true,
}
}
}
impl UploadQueue {
pub(crate) fn initialize_empty_remote(
&mut self,
@@ -213,18 +234,20 @@ impl UploadQueue {
Ok(self.initialized_mut().expect("we just set it"))
}
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
pub(crate) fn initialized_mut(
&mut self,
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
use UploadQueue::*;
match self {
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => {
if !x.shutting_down {
Ok(x)
Uninitialized => Err(NotInitialized::Uninitialized.into()),
Initialized(x) => {
if x.shutting_down {
Err(NotInitialized::ShuttingDown.into())
} else {
anyhow::bail!("queue is shutting down")
Ok(x)
}
}
Stopped(_) => Err(NotInitialized::Stopped.into()),
}
}