mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 20:32:56 +00:00
Compare commits
5 Commits
diko/baseb
...
problame/f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cabf452fa7 | ||
|
|
7c9f4c270e | ||
|
|
2404106586 | ||
|
|
b45c1b5965 | ||
|
|
82e97e0c59 |
@@ -1881,7 +1881,7 @@ impl Tenant {
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(), timeline::CompactionError> {
|
||||
) -> Result<(), timeline::CompactionError> {
|
||||
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
|
||||
if !self.is_active() {
|
||||
return Ok(());
|
||||
|
||||
@@ -619,8 +619,8 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
|
||||
Ok(self.l0_delta_layers.to_vec())
|
||||
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
|
||||
self.l0_delta_layers.to_vec()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer map
|
||||
|
||||
@@ -237,7 +237,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::storage_layer::{Layer, LayerFileName, ResidentLayer};
|
||||
use super::upload_queue::SetDeletedFlagProgress;
|
||||
use super::upload_queue::{self, SetDeletedFlagProgress};
|
||||
use super::Generation;
|
||||
|
||||
pub(crate) use download::{is_temp_download_file, list_remote_timelines};
|
||||
@@ -621,7 +621,9 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// Like schedule_index_upload_for_metadata_update(), this merely adds
|
||||
/// the upload to the upload queue and returns quickly.
|
||||
pub fn schedule_index_upload_for_file_changes(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) fn schedule_index_upload_for_file_changes(
|
||||
self: &Arc<Self>,
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -666,7 +668,7 @@ impl RemoteTimelineClient {
|
||||
pub(crate) fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
layer: ResidentLayer,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
@@ -875,7 +877,7 @@ impl RemoteTimelineClient {
|
||||
self: &Arc<Self>,
|
||||
compacted_from: &[Layer],
|
||||
compacted_to: &[ResidentLayer],
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), upload_queue::NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
|
||||
@@ -290,7 +290,7 @@ impl Layer {
|
||||
}
|
||||
|
||||
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
|
||||
let downloaded = self.0.get_or_maybe_download(true, None).await?;
|
||||
|
||||
Ok(ResidentLayer {
|
||||
@@ -1174,7 +1174,7 @@ pub(crate) enum EvictionError {
|
||||
|
||||
/// Error internal to the [`LayerInner::get_or_maybe_download`]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum DownloadError {
|
||||
pub(crate) enum DownloadError {
|
||||
#[error("timeline has already shutdown")]
|
||||
TimelineShutdown,
|
||||
#[error("no remote storage configured")]
|
||||
@@ -1197,6 +1197,15 @@ enum DownloadError {
|
||||
PostStatFailed(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
impl DownloadError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
match self {
|
||||
Self::TimelineShutdown | Self::DownloadCancelled => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub(crate) enum NeedsDownload {
|
||||
NotFound,
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
} else {
|
||||
@@ -210,6 +214,38 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
|
||||
fn log_compaction_error(
|
||||
e: &CompactionError,
|
||||
error_run_count: u32,
|
||||
sleep_duration: &std::time::Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
|
||||
enum LooksLike {
|
||||
Info,
|
||||
Error,
|
||||
}
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => Some(LooksLike::Error),
|
||||
};
|
||||
|
||||
match decision {
|
||||
Some(LooksLike::Info) => info!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
|
||||
),
|
||||
Some(LooksLike::Error) => error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
|
||||
),
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
|
||||
@@ -103,11 +103,14 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
||||
use super::{config::TenantConf, upload_queue::NotInitialized};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{
|
||||
remote_timeline_client::index::{IndexLayerMetadata, IndexPart},
|
||||
storage_layer::layer,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
pub(super) enum FlushLoopState {
|
||||
@@ -391,8 +394,7 @@ pub(crate) enum PageReconstructError {
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
/// The operation was cancelled
|
||||
#[error("Cancelled")]
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
/// The ancestor of this is being stopped
|
||||
@@ -404,6 +406,19 @@ pub(crate) enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use PageReconstructError::*;
|
||||
match self {
|
||||
Other(_) => false,
|
||||
AncestorLsnTimeout(_) => false,
|
||||
Cancelled | AncestorStopping(_) => true,
|
||||
WalRedo(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
@@ -832,8 +847,7 @@ impl Timeline {
|
||||
// "enough".
|
||||
let layers = self
|
||||
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
||||
.await
|
||||
.map_err(anyhow::Error::from)?;
|
||||
.await?;
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for layer in layers {
|
||||
remote_client.schedule_layer_file_upload(layer)?;
|
||||
@@ -3200,7 +3214,46 @@ pub(crate) enum CompactionError {
|
||||
ShuttingDown,
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
fn other<E>(err: E) -> Self
|
||||
where
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
CompactionError::Other(anyhow::Error::new(err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CompactionError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NotInitialized> for CompactionError {
|
||||
fn from(value: NotInitialized) -> Self {
|
||||
if value.is_stopping() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<layer::DownloadError> for CompactionError {
|
||||
fn from(value: layer::DownloadError) -> Self {
|
||||
if value.is_cancelled() {
|
||||
CompactionError::ShuttingDown
|
||||
} else {
|
||||
CompactionError::other(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -3333,7 +3386,7 @@ impl Timeline {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let layers = guard.layer_map();
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
let level0_deltas = layers.get_level0_deltas();
|
||||
let mut level0_deltas = level0_deltas
|
||||
.into_iter()
|
||||
.map(|x| guard.get_from_desc(&x))
|
||||
@@ -3380,7 +3433,8 @@ impl Timeline {
|
||||
delta
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.context("download layer for failpoint")?,
|
||||
.context("download layer for failpoint")
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
|
||||
@@ -3464,7 +3518,7 @@ impl Timeline {
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await?);
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
|
||||
// FIXME: should spawn_blocking the rest of this function
|
||||
@@ -3484,7 +3538,10 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
let coverage_size = layers
|
||||
.image_coverage(&key_range, last_record_lsn)
|
||||
.map_err(CompactionError::Other)?
|
||||
.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -3583,7 +3640,7 @@ impl Timeline {
|
||||
key, lsn, ref val, ..
|
||||
} in all_values_iter
|
||||
{
|
||||
let value = val.load(ctx).await?;
|
||||
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
@@ -3640,7 +3697,8 @@ impl Timeline {
|
||||
.take()
|
||||
.unwrap()
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
writer = None;
|
||||
|
||||
@@ -3670,7 +3728,8 @@ impl Timeline {
|
||||
lsn_range.clone()
|
||||
},
|
||||
)
|
||||
.await?,
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -3681,7 +3740,12 @@ impl Timeline {
|
||||
});
|
||||
|
||||
if !self.shard_identity.is_key_disposable(&key) {
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
|
||||
writer
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.put_value(key, lsn, value)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
} else {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
@@ -3697,7 +3761,12 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
|
||||
new_layers.push(
|
||||
writer
|
||||
.finish(prev_key.unwrap().next(), self)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
@@ -3726,7 +3795,8 @@ impl Timeline {
|
||||
// minimize latency.
|
||||
par_fsync::par_fsync_async(&layer_paths)
|
||||
.await
|
||||
.context("fsync all new layers")?;
|
||||
.context("fsync all new layers")
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
let timeline_dir = self
|
||||
.conf
|
||||
@@ -3734,7 +3804,8 @@ impl Timeline {
|
||||
|
||||
par_fsync::par_fsync_async(&[timeline_dir])
|
||||
.await
|
||||
.context("fsync of timeline dir")?;
|
||||
.context("fsync of timeline dir")
|
||||
.map_err(CompactionError::Other)?;
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
@@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped {
|
||||
pub(super) deleted_at: SetDeletedFlagProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum NotInitialized {
|
||||
#[error("queue is in state Uninitialized")]
|
||||
Uninitialized,
|
||||
#[error("queue is in state Stopping")]
|
||||
Stopped,
|
||||
#[error("queue is shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl NotInitialized {
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use NotInitialized::*;
|
||||
match self {
|
||||
Uninitialized => false,
|
||||
Stopped => true,
|
||||
ShuttingDown => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
pub(crate) fn initialize_empty_remote(
|
||||
&mut self,
|
||||
@@ -213,18 +234,20 @@ impl UploadQueue {
|
||||
Ok(self.initialized_mut().expect("we just set it"))
|
||||
}
|
||||
|
||||
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
pub(crate) fn initialized_mut(
|
||||
&mut self,
|
||||
) -> Result<&mut UploadQueueInitialized, NotInitialized> {
|
||||
use UploadQueue::*;
|
||||
match self {
|
||||
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Initialized(x) => {
|
||||
if !x.shutting_down {
|
||||
Ok(x)
|
||||
Uninitialized => Err(NotInitialized::Uninitialized.into()),
|
||||
Initialized(x) => {
|
||||
if x.shutting_down {
|
||||
Err(NotInitialized::ShuttingDown.into())
|
||||
} else {
|
||||
anyhow::bail!("queue is shutting down")
|
||||
Ok(x)
|
||||
}
|
||||
}
|
||||
Stopped(_) => Err(NotInitialized::Stopped.into()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user