diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 11c0bbdfe5..b6a349a209 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -192,7 +192,12 @@ pub enum ImageLayerCreationMode { #[derive(Clone, Debug, Default)] pub enum LastImageLayerCreationStatus { - Incomplete, // TODO: record the last key being processed + Incomplete { + /// The last key of the partition (exclusive) that was processed in the last + /// image layer creation attempt. We will continue from this key in the next + /// attempt. + last_key: Key, + }, Complete, #[default] Initial, @@ -4346,7 +4351,7 @@ impl Timeline { Ok(result) } - // Is it time to create a new image layer for the given partition? + // Is it time to create a new image layer for the given partition? True if we want to generate. async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool { let threshold = self.get_image_creation_threshold(); @@ -4658,6 +4663,11 @@ impl Timeline { ) -> Result<(Vec, LastImageLayerCreationStatus), CreateImageLayersError> { let timer = self.metrics.create_images_time_histo.start_timer(); + if partitioning.parts.is_empty() { + warn!("no partitions to create image layers for"); + return Ok((vec![], LastImageLayerCreationStatus::Complete)); + } + // We need to avoid holes between generated image layers. // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one // image layer with hole between them. In this case such layer can not be utilized by GC. @@ -4669,28 +4679,65 @@ impl Timeline { // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it. let mut start = Key::MIN; - let check_for_image_layers = if let LastImageLayerCreationStatus::Incomplete = last_status { - info!( - "resuming image layer creation: last_status={:?}", - last_status - ); - true - } else { - self.should_check_if_image_layers_required(lsn) - }; + let check_for_image_layers = + if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status { + info!( + "resuming image layer creation: last_status=incomplete, continue from {}", + last_key + ); + true + } else { + self.should_check_if_image_layers_required(lsn) + }; let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?; let mut all_generated = true; let mut partition_processed = 0; - let total_partitions = partitioning.parts.len(); + let mut total_partitions = partitioning.parts.len(); + let mut last_partition_processed = None; + let mut partition_parts = partitioning.parts.clone(); - for partition in partitioning.parts.iter() { + if let LastImageLayerCreationStatus::Incomplete { last_key } = last_status { + // We need to skip the partitions that have already been processed. + let mut found = false; + for (i, partition) in partition_parts.iter().enumerate() { + if last_key <= partition.end().unwrap() { + // ```plain + // |------|--------|----------|------| + // ^last_key + // ^start from this partition + // ``` + // Why `i+1` instead of `i`? + // It is possible that the user did some writes after the previous image layer creation attempt so that + // a relation grows in size, and the last_key is now in the middle of the partition. In this case, we + // still want to skip this partition, so that we can make progress and avoid generating image layers over + // the same partition. Doing a mod to ensure we don't end up with an empty vec. + if i + 1 >= total_partitions { + // In general, this case should not happen -- if last_key is on the last partition, the previous + // iteration of image layer creation should return a complete status. + break; // with found=false + } + partition_parts = partition_parts.split_off(i + 1); // Remove the first i + 1 elements + total_partitions = partition_parts.len(); + // Update the start key to the partition start. + start = partition_parts[0].start().unwrap(); + found = true; + break; + } + } + if !found { + // Last key is within the last partition, or larger than all partitions. + return Ok((vec![], LastImageLayerCreationStatus::Complete)); + } + } + + for partition in partition_parts.iter() { if self.cancel.is_cancelled() { return Err(CreateImageLayersError::Cancelled); } - + partition_processed += 1; let img_range = start..partition.ranges.last().unwrap().end; let compact_metadata = partition.overlaps(&Key::metadata_key_range()); if compact_metadata { @@ -4725,6 +4772,8 @@ impl Timeline { lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn), is_delta: false, }) { + // TODO: this can be processed with the BatchLayerWriter::finish_with_discard + // in the future. tracing::info!( "Skipping image layer at {lsn} {}..{}, already exists", img_range.start, @@ -4805,8 +4854,6 @@ impl Timeline { } } - partition_processed += 1; - if let ImageLayerCreationMode::Try = mode { // We have at least made some progress if batch_image_writer.pending_layer_num() >= 1 { @@ -4822,8 +4869,10 @@ impl Timeline { * self.get_compaction_threshold(); if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold { tracing::info!( - "preempt image layer generation at {start} at {lsn}: too many L0 layers {num_of_l0_layers}", + "preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}", + partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers ); + last_partition_processed = Some(partition.clone()); all_generated = false; break; } @@ -4868,7 +4917,14 @@ impl Timeline { if all_generated { LastImageLayerCreationStatus::Complete } else { - LastImageLayerCreationStatus::Incomplete + LastImageLayerCreationStatus::Incomplete { + last_key: if let Some(last_partition_processed) = last_partition_processed { + last_partition_processed.end().unwrap_or(Key::MIN) + } else { + // This branch should be unreachable, but in case it happens, we can just return the start key. + Key::MIN + }, + } }, )) } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7dd37d7232..466ebea783 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -748,7 +748,7 @@ impl Timeline { .store(Arc::new(outcome.clone())); self.upload_new_image_layers(image_layers)?; - if let LastImageLayerCreationStatus::Incomplete = outcome { + if let LastImageLayerCreationStatus::Incomplete { .. } = outcome { // Yield and do not do any other kind of compaction. info!("skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."); return Ok(CompactionOutcome::Pending); diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index c031d66dfb..f3347b594e 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -29,6 +29,21 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = { # "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later } +PREEMPT_COMPACTION_TENANT_CONF = { + "gc_period": "5s", + "compaction_period": "5s", + # Small checkpoint distance to create many layers + "checkpoint_distance": 1024**2, + # Compact small layers + "compaction_target_size": 1024**2, + "image_creation_threshold": 1, + "image_creation_preempt_threshold": 1, + # compact more frequently + "compaction_threshold": 3, + "compaction_upper_limit": 6, + "lsn_lease_length": "0s", +} + @skip_in_debug_build("only run with release build") @pytest.mark.parametrize( @@ -36,7 +51,8 @@ AGGRESSIVE_COMPACTION_TENANT_CONF = { [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], ) def test_pageserver_compaction_smoke( - neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol + neon_env_builder: NeonEnvBuilder, + wal_receiver_protocol: PageserverWalReceiverProtocol, ): """ This is a smoke test that compaction kicks in. The workload repeatedly churns @@ -54,7 +70,8 @@ def test_pageserver_compaction_smoke( page_cache_size=10 """ - env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF) + conf = AGGRESSIVE_COMPACTION_TENANT_CONF.copy() + env = neon_env_builder.init_start(initial_tenant_conf=conf) tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -113,6 +130,41 @@ page_cache_size=10 assert vectored_average < 8 +@skip_in_debug_build("only run with release build") +def test_pageserver_compaction_preempt( + neon_env_builder: NeonEnvBuilder, +): + # Ideally we should be able to do unit tests for this, but we need real Postgres + # WALs in order to do unit testing... + + conf = PREEMPT_COMPACTION_TENANT_CONF.copy() + env = neon_env_builder.init_start(initial_tenant_conf=conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + row_count = 200000 + churn_rounds = 10 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + log.info("Writing initial data ...") + workload.write_rows(row_count, env.pageserver.id) + + for i in range(1, churn_rounds + 1): + log.info(f"Running churn round {i}/{churn_rounds} ...") + workload.churn_rows(row_count, env.pageserver.id, upload=False) + workload.validate(env.pageserver.id) + ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True) + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + # ensure image layer creation gets preempted and then resumed + env.pageserver.assert_log_contains("resuming image layer creation") + + @skip_in_debug_build("only run with release build") @pytest.mark.parametrize( "with_branches",