mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
Merge pull request #10878 from neondatabase/rc/release/2025-02-18
This commit is contained in:
@@ -2341,6 +2341,7 @@ async fn timeline_checkpoint_handler(
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e)
|
||||
}
|
||||
)?;
|
||||
|
||||
@@ -3147,6 +3147,12 @@ impl Tenant {
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
CompactionError::CollectKeySpaceError(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
@@ -7839,18 +7845,6 @@ mod tests {
|
||||
}
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
// Force layers to L1
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if iter % 5 == 0 {
|
||||
let (_, before_delta_file_accessed) =
|
||||
@@ -7863,7 +7857,6 @@ mod tests {
|
||||
let mut flags = EnumSet::new();
|
||||
flags.insert(CompactFlags::ForceImageLayerCreation);
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
flags.insert(CompactFlags::ForceL0Compaction);
|
||||
flags
|
||||
},
|
||||
&ctx,
|
||||
@@ -8310,8 +8303,6 @@ mod tests {
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
|
||||
tline.force_set_disk_consistent_lsn(Lsn(0x40));
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
@@ -8325,7 +8316,8 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Image layers are created at repartition LSN
|
||||
|
||||
// Image layers are created at last_record_lsn
|
||||
let images = tline
|
||||
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
|
||||
.await
|
||||
|
||||
@@ -287,6 +287,7 @@ fn log_compaction_error(
|
||||
sleep_duration: Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
@@ -294,6 +295,8 @@ fn log_compaction_error(
|
||||
let level = match err {
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
@@ -1875,7 +1875,7 @@ impl Timeline {
|
||||
// Signal compaction failure to avoid L0 flush stalls when it's broken.
|
||||
match result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(CompactionError::Other(_)) => {
|
||||
Err(CompactionError::Other(_)) | Err(CompactionError::CollectKeySpaceError(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
// Don't change the current value on offload failure or shutdown. We don't want to
|
||||
@@ -4546,7 +4546,10 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let (dense_ks, sparse_ks) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::CollectKeySpaceError)?;
|
||||
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
|
||||
let sparse_partitioning = SparseKeyPartitioning {
|
||||
parts: vec![sparse_ks],
|
||||
@@ -5259,6 +5262,8 @@ pub(crate) enum CompactionError {
|
||||
#[error("Failed to offload timeline: {0}")]
|
||||
Offload(OffloadError),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error("Failed to collect keyspace: {0}")]
|
||||
CollectKeySpaceError(CollectKeySpaceError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
@@ -11,7 +11,8 @@ use std::sync::Arc;
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, GetVectoredError,
|
||||
ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline,
|
||||
ImageLayerCreationMode, LastImageLayerCreationStatus, PageReconstructError, RecordedDuration,
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
@@ -31,6 +32,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
@@ -692,21 +694,6 @@ impl Timeline {
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
let l0_l1_boundary_lsn = {
|
||||
// We do the repartition on the L0-L1 boundary. All data below the boundary
|
||||
// are compacted by L0 with low read amplification, thus making the `repartition`
|
||||
// function run fast.
|
||||
let guard = self.layers.read().await;
|
||||
let l0_min_lsn = guard
|
||||
.layer_map()?
|
||||
.level0_deltas()
|
||||
.iter()
|
||||
.map(|l| l.get_lsn_range().start)
|
||||
.min()
|
||||
.unwrap_or(self.get_disk_consistent_lsn());
|
||||
l0_min_lsn.max(self.get_ancestor_lsn())
|
||||
};
|
||||
|
||||
// 1. L0 Compact
|
||||
let l0_outcome = {
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
@@ -733,86 +720,89 @@ impl Timeline {
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
|
||||
if l0_l1_boundary_lsn < self.partitioning.read().1 {
|
||||
// We never go backwards when repartition and create image layers.
|
||||
info!("skipping image layer generation because repartition LSN is greater than L0-L1 boundary LSN.");
|
||||
} else {
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
l0_l1_boundary_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::extend(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
.build();
|
||||
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
let mut partitioning = dense_partitioning;
|
||||
partitioning
|
||||
.parts
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
!options.flags.contains(CompactFlags::NoYield),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
if options
|
||||
.flags
|
||||
.contains(CompactFlags::ForceImageLayerCreation)
|
||||
{
|
||||
ImageLayerCreationMode::Force
|
||||
} else {
|
||||
ImageLayerCreationMode::Try
|
||||
},
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
.load()
|
||||
.as_ref()
|
||||
.clone(),
|
||||
!options.flags.contains(CompactFlags::NoYield),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
if let CreateImageLayersError::GetVectoredError(
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
})?;
|
||||
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline
|
||||
// as a simple key-value store, ignoring the datadir layout. Log the
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
if let CompactionError::CollectKeySpaceError(
|
||||
CollectKeySpaceError::Decode(_)
|
||||
| CollectKeySpaceError::PageRead(PageReconstructError::MissingKey(_)),
|
||||
) = err
|
||||
{
|
||||
critical!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
} else {
|
||||
tracing::error!(
|
||||
"could not compact, repartitioning keyspace failed: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let partition_count = self.partitioning.read().0 .0.parts.len();
|
||||
|
||||
|
||||
@@ -20,9 +20,6 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
@pytest.mark.skip(
|
||||
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"attach_mode",
|
||||
["default_generation", "same_generation"],
|
||||
|
||||
Reference in New Issue
Block a user