diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 985b480a76..7049a0bd66 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -571,14 +571,15 @@ impl RemoteTimelineClient { Ok(()) } - /// /// Launch a delete operation in the background. /// + /// The operation does not modify local state but assumes the local files have already been + /// deleted, and is used to mirror those changes to remote. + /// /// Note: This schedules an index file upload before the deletions. The /// deletion won't actually be performed, until any previously scheduled /// upload operations, and the index file upload, have completed /// succesfully. - /// pub fn schedule_layer_file_deletion( self: &Arc, names: &[LayerFileName], diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7a5a9de2f4..e606cacf92 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -613,7 +613,10 @@ impl Timeline { self.flush_frozen_layers_and_wait().await } + /// Outermost timeline compaction operation; downloads needed layers. pub async fn compact(&self, ctx: &RequestContext) -> anyhow::Result<()> { + const ROUNDS: usize = 2; + let last_record_lsn = self.get_last_record_lsn(); // Last record Lsn could be zero in case the timeline was just created @@ -622,6 +625,86 @@ impl Timeline { return Ok(()); } + // retry two times to allow first round to find layers which need to be downloaded, then + // download them, then retry compaction + for round in 0..ROUNDS { + // should we error out with the most specific error? + let last_round = round == ROUNDS - 1; + + let res = self.compact_inner(ctx).await; + + // If `create_image_layers' or `compact_level0` scheduled any + // uploads or deletions, but didn't update the index file yet, + // do it now. + // + // This isn't necessary for correctness, the remote state is + // consistent without the uploads and deletions, and we would + // update the index file on next flush iteration too. But it + // could take a while until that happens. + // + // Additionally, only do this on the terminal round before sleeping. + if last_round { + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_index_upload_for_file_changes()?; + } + } + + let rls = match res { + Ok(()) => return Ok(()), + Err(CompactionError::DownloadRequired(rls)) if !last_round => { + // this can be done at most one time before exiting, waiting + rls + } + Err(CompactionError::DownloadRequired(rls)) => { + anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len()) + } + Err(CompactionError::Other(e)) => { + return Err(e); + } + }; + + // this path can be visited in the second round of retrying, if first one found that we + // must first download some remote layers + let total = rls.len(); + + let mut downloads = rls + .into_iter() + .map(|rl| self.download_remote_layer(rl)) + .collect::>(); + + let mut failed = 0; + + let cancelled = task_mgr::shutdown_watcher(); + tokio::pin!(cancelled); + + loop { + tokio::select! { + _ = &mut cancelled => anyhow::bail!("Cancelled while downloading remote layers"), + res = downloads.next() => { + match res { + Some(Ok(())) => {}, + Some(Err(e)) => { + warn!("Downloading remote layer for compaction failed: {e:#}"); + failed += 1; + } + None => break, + } + } + } + } + + if failed != 0 { + anyhow::bail!("{failed} out of {total} layers failed to download, retrying later"); + } + + // if everything downloaded fine, lets try again + } + + unreachable!("retry loop exits") + } + + /// Compaction which might need to be retried after downloading remote layers. + async fn compact_inner(&self, ctx: &RequestContext) -> Result<(), CompactionError> { // // High level strategy for compaction / image creation: // @@ -660,7 +743,7 @@ impl Timeline { // Is the timeline being deleted? let state = *self.state.borrow(); if state == TimelineState::Stopping { - anyhow::bail!("timeline is Stopping"); + return Err(anyhow::anyhow!("timeline is Stopping").into()); } let target_file_size = self.get_checkpoint_distance(); @@ -680,7 +763,8 @@ impl Timeline { // "enough". let layer_paths_to_upload = self .create_image_layers(&partitioning, lsn, false, ctx) - .await?; + .await + .map_err(anyhow::Error::from)?; if let Some(remote_client) = &self.remote_client { for (path, layer_metadata) in layer_paths_to_upload { remote_client.schedule_layer_file_upload(&path, &layer_metadata)?; @@ -692,18 +776,6 @@ impl Timeline { self.compact_level0(&layer_removal_cs, target_file_size, ctx) .await?; timer.stop_and_record(); - - // If `create_image_layers' or `compact_level0` scheduled any - // uploads or deletions, but didn't update the index file yet, - // do it now. - // - // This isn't necessary for correctness, the remote state is - // consistent without the uploads and deletions, and we would - // update the index file on next flush iteration too. But it - // could take a while until that happens. - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_index_upload_for_file_changes()?; - } } Err(err) => { // no partitioning? This is normal, if the timeline was just created @@ -2541,10 +2613,13 @@ impl Timeline { ) -> anyhow::Result<(KeyPartitioning, Lsn)> { { let partitioning_guard = self.partitioning.lock().unwrap(); - if partitioning_guard.1 != Lsn(0) - && lsn.0 - partitioning_guard.1 .0 <= self.repartition_threshold - { - // no repartitioning needed + let distance = lsn.0 - partitioning_guard.1 .0; + if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold { + debug!( + distance, + threshold = self.repartition_threshold, + "no repartitioning needed" + ); return Ok((partitioning_guard.0.clone(), partitioning_guard.1)); } } @@ -2562,8 +2637,12 @@ impl Timeline { // Is it time to create a new image layer for the given partition? fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + let threshold = self.get_image_creation_threshold(); + let layers = self.layers.read().unwrap(); + let mut max_deltas = 0; + for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn)?; for (img_range, last_img) in image_coverage { @@ -2585,21 +2664,25 @@ impl Timeline { // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed // after we read last_record_lsn, which is passed here in the 'lsn' argument. if img_lsn < lsn { - let threshold = self.get_image_creation_threshold(); let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?; - debug!( - "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}", - img_range.start, img_range.end, num_deltas, img_lsn, lsn - ); + max_deltas = max_deltas.max(num_deltas); if num_deltas >= threshold { + debug!( + "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}", + img_range.start, img_range.end, num_deltas, img_lsn, lsn + ); return Ok(true); } } } } + debug!( + max_deltas, + "none of the partitioned ranges had >= {threshold} deltas" + ); Ok(false) } @@ -2712,25 +2795,55 @@ impl Timeline { Ok(layer_paths_to_upload) } } + #[derive(Default)] struct CompactLevel0Phase1Result { new_layers: Vec, deltas_to_compact: Vec>, } +/// Top-level failure to compact. +#[derive(Debug)] +enum CompactionError { + /// L0 compaction requires layers to be downloaded. + /// + /// This should not happen repeatedly, but will be retried once by top-level + /// `Timeline::compact`. + DownloadRequired(Vec>), + /// Compaction cannot be done right now; page reconstruction and so on. + Other(anyhow::Error), +} + +impl From for CompactionError { + fn from(value: anyhow::Error) -> Self { + CompactionError::Other(value) + } +} + impl Timeline { + /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. + /// + /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are + /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the + /// start of level0 files compaction, the on-demand download should be revisited as well. async fn compact_level0_phase1( &self, + _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, target_file_size: u64, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let layers = self.layers.read().unwrap(); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); // Only compact if enough layers have accumulated. - if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() { - return Ok(Default::default()); + let threshold = self.get_compaction_threshold(); + if level0_deltas.is_empty() || level0_deltas.len() < threshold { + debug!( + level0_deltas = level0_deltas.len(), + threshold, "too few deltas to compact" + ); + return Ok(CompactLevel0Phase1Result::default()); } // Gather the files to compact in this iteration. @@ -2766,6 +2879,24 @@ impl Timeline { end: deltas_to_compact.last().unwrap().get_lsn_range().end, }; + let remotes = deltas_to_compact + .iter() + .filter(|l| l.is_remote_layer()) + .inspect(|l| info!("compact requires download of {}", l.filename().file_name())) + .map(|l| { + l.clone() + .downcast_remote_layer() + .expect("just checked it is remote layer") + }) + .collect::>(); + + if !remotes.is_empty() { + // caller is holding the lock to layer_removal_cs, and we don't want to download while + // holding that; in future download_remote_layer might take it as well. this is + // regardless of earlier image creation downloading on-demand, while holding the lock. + return Err(CompactionError::DownloadRequired(remotes)); + } + info!( "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)", lsn_range.start, @@ -2773,9 +2904,11 @@ impl Timeline { deltas_to_compact.len(), level0_deltas.len() ); + for l in deltas_to_compact.iter() { info!("compact includes {}", l.filename().file_name()); } + // We don't need the original list of layers anymore. Drop it so that // we don't accidentally use it later in the function. drop(level0_deltas); @@ -2945,7 +3078,9 @@ impl Timeline { } fail_point!("delta-layer-writer-fail-before-finish", |_| { - anyhow::bail!("failpoint delta-layer-writer-fail-before-finish"); + return Err( + anyhow::anyhow!("failpoint delta-layer-writer-fail-before-finish").into(), + ); }); writer.as_mut().unwrap().put_value(key, lsn, value)?; @@ -2964,7 +3099,7 @@ impl Timeline { // Fsync all the layer files and directory using multiple threads to // minimize latency. - par_fsync::par_fsync(&layer_paths)?; + par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?; layer_paths.pop().unwrap(); } @@ -2986,11 +3121,13 @@ impl Timeline { layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, target_file_size: u64, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), CompactionError> { let CompactLevel0Phase1Result { new_layers, deltas_to_compact, - } = self.compact_level0_phase1(target_file_size, ctx).await?; + } = self + .compact_level0_phase1(layer_removal_cs, target_file_size, ctx) + .await?; if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do @@ -3014,7 +3151,12 @@ impl Timeline { for l in new_layers { let new_delta_path = l.path(); - let metadata = new_delta_path.metadata()?; + let metadata = new_delta_path.metadata().with_context(|| { + format!( + "read file metadata for new created layer {}", + new_delta_path.display() + ) + })?; if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_upload( @@ -3248,7 +3390,7 @@ impl Timeline { let mut layers_to_remove = Vec::new(); - // Scan all on-disk layers in the timeline. + // Scan all layers in the timeline (remote or on-disk). // // Garbage collect the layer if all conditions are satisfied: // 1. it is older than cutoff LSN; diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 3551f27cad..f5f8491ada 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -1,7 +1,10 @@ # It's possible to run any regular test with the local fs remote storage via # env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... +import time +from collections import defaultdict from pathlib import Path +from typing import Any, DefaultDict, Dict import pytest from fixtures.log_helper import log @@ -10,6 +13,7 @@ from fixtures.neon_fixtures import ( RemoteStorageKind, assert_tenant_status, available_remote_storages, + wait_for_last_flush_lsn, wait_for_last_record_lsn, wait_for_sk_commit_lsn_to_reach_remote_storage, wait_for_upload, @@ -449,3 +453,167 @@ def test_download_remote_layers_api( pg_old = env.postgres.create_start(branch_name="main") with pg_old.cursor() as cur: assert query_scalar(cur, "select count(*) from testtab") == table_len + + +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3]) +def test_compaction_downloads_on_demand_without_image_creation( + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind +): + """ + Create a few layers, then evict, then make sure compaction runs successfully. + """ + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_compaction_downloads_on_demand_without_image_creation", + ) + + env = neon_env_builder.init_start() + + conf = { + # Disable background GC & compaction + "gc_period": "0s", + "compaction_period": "0s", + # unused, because manual will be called after each table + "checkpoint_distance": 100 * 1024**2, + # this will be updated later on to allow manual compaction outside of checkpoints + "compaction_threshold": 100, + # repartitioning parameter, not required here + "image_creation_threshold": 100, + # repartitioning parameter, not required here + "compaction_target_size": 128 * 1024**2, + # pitr_interval and gc_horizon are not interesting because we dont run gc + } + + # Override defaults, to create more layers + tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf)) + env.initial_tenant = tenant_id + pageserver_http = env.pageserver.http_client() + + with env.postgres.create_start("main") as pg: + # no particular reason to create the layers like this, but we are sure + # not to hit the image_creation_threshold here. + with pg.cursor() as cur: + cur.execute("create table a as select id::bigint from generate_series(1, 204800) s(id)") + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + with pg.cursor() as cur: + cur.execute("update a set id = -id") + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + assert not layers.in_memory_layers, "no inmemory layers expected after post-commit checkpoint" + assert len(layers.historic_layers) == 1 + 2, "should have inidb layer and 2 deltas" + + for layer in layers.historic_layers: + log.info(f"pre-compact: {layer}") + pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name) + + env.neon_cli.config_tenant(tenant_id, {"compaction_threshold": "3"}) + + pageserver_http.timeline_compact(tenant_id, timeline_id) + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + for layer in layers.historic_layers: + log.info(f"post compact: {layer}") + assert len(layers.historic_layers) == 1, "should have compacted to single layer" + + +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3]) +def test_compaction_downloads_on_demand_with_image_creation( + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind +): + """ + Create layers, compact with high image_creation_threshold, then run final compaction with all layers evicted. + + Due to current implementation, this will make image creation on-demand download layers, but we cannot really + directly test for it. + """ + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storage_kind, + test_name="test_compaction_downloads_on_demand", + ) + + env = neon_env_builder.init_start() + + conf = { + # Disable background GC & compaction + "gc_period": "0s", + "compaction_period": "0s", + # repartitioning threshold is this / 10, but it doesn't really seem to matter + "checkpoint_distance": 50 * 1024**2, + "compaction_threshold": 3, + # important: keep this high for the data ingestion + "image_creation_threshold": 100, + # repartitioning parameter, unused + "compaction_target_size": 128 * 1024**2, + # pitr_interval and gc_horizon are not interesting because we dont run gc + } + + # Override defaults, to create more layers + tenant_id, timeline_id = env.neon_cli.create_tenant(conf=stringify(conf)) + env.initial_tenant = tenant_id + pageserver_http = env.pageserver.http_client() + + with env.postgres.create_start("main") as pg: + # no particular reason to create the layers like this, but we are sure + # not to hit the image_creation_threshold here. + with pg.cursor() as cur: + cur.execute("create table a (id bigserial primary key, some_value bigint not null)") + cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)") + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + for _ in range(0, 2): + for i in range(0, 3): + # create a minimal amount of "delta difficulty" for this table + with pg.cursor() as cur: + cur.execute("update a set some_value = -some_value + %s", (i,)) + + with pg.cursor() as cur: + # vacuuming should aid to reuse keys, though it's not really important + # with image_creation_threshold=1 which we will use on the last compaction + cur.execute("vacuum") + + wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + pageserver_http.timeline_checkpoint(tenant_id, timeline_id) + + # images should not yet be created, because threshold is too high, + # but these will be reshuffled to L1 layers + pageserver_http.timeline_compact(tenant_id, timeline_id) + + for _ in range(0, 20): + # loop in case flushing is still in progress + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + if not layers.in_memory_layers: + break + time.sleep(0.2) + + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + assert not layers.in_memory_layers, "no inmemory layers expected after post-commit checkpoint" + + kinds_before: DefaultDict[str, int] = defaultdict(int) + + for layer in layers.historic_layers: + kinds_before[layer.kind] += 1 + pageserver_http.evict_layer(tenant_id, timeline_id, layer.layer_file_name) + + assert dict(kinds_before) == {"Delta": 4} + + # now having evicted all layers, reconfigure to have lower image creation + # threshold to expose image creation to downloading all of the needed + # layers -- threshold of 2 would sound more reasonable, but keeping it as 1 + # to be less flaky + env.neon_cli.config_tenant(tenant_id, {"image_creation_threshold": "1"}) + + pageserver_http.timeline_compact(tenant_id, timeline_id) + layers = pageserver_http.layer_map_info(tenant_id, timeline_id) + kinds_after: DefaultDict[str, int] = defaultdict(int) + for layer in layers.historic_layers: + kinds_after[layer.kind] += 1 + + assert dict(kinds_after) == {"Delta": 4, "Image": 1} + + +def stringify(conf: Dict[str, Any]) -> Dict[str, str]: + return dict(map(lambda x: (x[0], str(x[1])), conf.items()))