mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
feat(pageserver): continue from last incomplete image layer creation (#10660)
## Problem close https://github.com/neondatabase/neon/issues/10651 ## Summary of changes * Image layer creation starts from the next partition of the last processed partition if the previous attempt was not complete. * Add tests. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -192,7 +192,12 @@ pub enum ImageLayerCreationMode {
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub enum LastImageLayerCreationStatus {
|
||||
Incomplete, // TODO: record the last key being processed
|
||||
Incomplete {
|
||||
/// The last key of the partition (exclusive) that was processed in the last
|
||||
/// image layer creation attempt. We will continue from this key in the next
|
||||
/// attempt.
|
||||
last_key: Key,
|
||||
},
|
||||
Complete,
|
||||
#[default]
|
||||
Initial,
|
||||
@@ -4346,7 +4351,7 @@ impl Timeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
// Is it time to create a new image layer for the given partition? True if we want to generate.
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
@@ -4658,6 +4663,11 @@ impl Timeline {
|
||||
) -> Result<(Vec<ResidentLayer>, LastImageLayerCreationStatus), CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
|
||||
if partitioning.parts.is_empty() {
|
||||
warn!("no partitions to create image layers for");
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
|
||||
// We need to avoid holes between generated image layers.
|
||||
// Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
|
||||
// image layer with hole between them. In this case such layer can not be utilized by GC.
|
||||
@@ -4669,28 +4679,65 @@ impl Timeline {
|
||||
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
||||
let mut start = Key::MIN;
|
||||
|
||||
let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status={:?}",
|
||||
last_status
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
let check_for_image_layers =
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
info!(
|
||||
"resuming image layer creation: last_status=incomplete, continue from {}",
|
||||
last_key
|
||||
);
|
||||
true
|
||||
} else {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
let mut partition_processed = 0;
|
||||
let total_partitions = partitioning.parts.len();
|
||||
let mut total_partitions = partitioning.parts.len();
|
||||
let mut last_partition_processed = None;
|
||||
let mut partition_parts = partitioning.parts.clone();
|
||||
|
||||
for partition in partitioning.parts.iter() {
|
||||
if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status {
|
||||
// We need to skip the partitions that have already been processed.
|
||||
let mut found = false;
|
||||
for (i, partition) in partition_parts.iter().enumerate() {
|
||||
if last_key <= partition.end().unwrap() {
|
||||
// ```plain
|
||||
// |------|--------|----------|------|
|
||||
// ^last_key
|
||||
// ^start from this partition
|
||||
// ```
|
||||
// Why `i+1` instead of `i`?
|
||||
// It is possible that the user did some writes after the previous image layer creation attempt so that
|
||||
// a relation grows in size, and the last_key is now in the middle of the partition. In this case, we
|
||||
// still want to skip this partition, so that we can make progress and avoid generating image layers over
|
||||
// the same partition. Doing a mod to ensure we don't end up with an empty vec.
|
||||
if i + 1 >= total_partitions {
|
||||
// In general, this case should not happen -- if last_key is on the last partition, the previous
|
||||
// iteration of image layer creation should return a complete status.
|
||||
break; // with found=false
|
||||
}
|
||||
partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements
|
||||
total_partitions = partition_parts.len();
|
||||
// Update the start key to the partition start.
|
||||
start = partition_parts[0].start().unwrap();
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// Last key is within the last partition, or larger than all partitions.
|
||||
return Ok((vec![], LastImageLayerCreationStatus::Complete));
|
||||
}
|
||||
}
|
||||
|
||||
for partition in partition_parts.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
|
||||
if compact_metadata {
|
||||
@@ -4725,6 +4772,8 @@ impl Timeline {
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
}) {
|
||||
// TODO: this can be processed with the BatchLayerWriter::finish_with_discard
|
||||
// in the future.
|
||||
tracing::info!(
|
||||
"Skipping image layer at {lsn} {}..{}, already exists",
|
||||
img_range.start,
|
||||
@@ -4805,8 +4854,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
partition_processed += 1;
|
||||
|
||||
if let ImageLayerCreationMode::Try = mode {
|
||||
// We have at least made some progress
|
||||
if batch_image_writer.pending_layer_num() >= 1 {
|
||||
@@ -4822,8 +4869,10 @@ impl Timeline {
|
||||
* self.get_compaction_threshold();
|
||||
if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold {
|
||||
tracing::info!(
|
||||
"preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}",
|
||||
"preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}",
|
||||
partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers
|
||||
);
|
||||
last_partition_processed = Some(partition.clone());
|
||||
all_generated = false;
|
||||
break;
|
||||
}
|
||||
@@ -4868,7 +4917,14 @@ impl Timeline {
|
||||
if all_generated {
|
||||
LastImageLayerCreationStatus::Complete
|
||||
} else {
|
||||
LastImageLayerCreationStatus::Incomplete
|
||||
LastImageLayerCreationStatus::Incomplete {
|
||||
last_key: if let Some(last_partition_processed) = last_partition_processed {
|
||||
last_partition_processed.end().unwrap_or(Key::MIN)
|
||||
} else {
|
||||
// This branch should be unreachable, but in case it happens, we can just return the start key.
|
||||
Key::MIN
|
||||
},
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
@@ -748,7 +748,7 @@ impl Timeline {
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete = outcome {
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction).");
|
||||
return Ok(CompactionOutcome::Pending);
|
||||
|
||||
@@ -29,6 +29,21 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
|
||||
}
|
||||
|
||||
PREEMPT_COMPACTION_TENANT_CONF = {
|
||||
"gc_period": "5s",
|
||||
"compaction_period": "5s",
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024**2,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024**2,
|
||||
"image_creation_threshold": 1,
|
||||
"image_creation_preempt_threshold": 1,
|
||||
# compact more frequently
|
||||
"compaction_threshold": 3,
|
||||
"compaction_upper_limit": 6,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
@@ -36,7 +51,8 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = {
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
@@ -54,7 +70,8 @@ def test_pageserver_compaction_smoke(
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
|
||||
conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -113,6 +130,41 @@ page_cache_size=10
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_preempt(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
# Ideally we should be able to do unit tests for this, but we need real Postgres
|
||||
# WALs in order to do unit testing...
|
||||
|
||||
conf = PREEMPT_COMPACTION_TENANT_CONF.copy()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
row_count = 200000
|
||||
churn_rounds = 10
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageserver.id)
|
||||
|
||||
log.info("Writing initial data ...")
|
||||
workload.write_rows(row_count, env.pageserver.id)
|
||||
|
||||
for i in range(1, churn_rounds + 1):
|
||||
log.info(f"Running churn round {i}/{churn_rounds} ...")
|
||||
workload.churn_rows(row_count, env.pageserver.id, upload=False)
|
||||
workload.validate(env.pageserver.id)
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
log.info("Validating at workload end ...")
|
||||
workload.validate(env.pageserver.id)
|
||||
# ensure image layer creation gets preempted and then resumed
|
||||
env.pageserver.assert_log_contains("resuming image layer creation")
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize(
|
||||
"with_branches",
|
||||
|
||||
Reference in New Issue
Block a user